|
From: <tho...@us...> - 2010-09-23 20:11:07
|
Revision: 3617
http://bigdata.svn.sourceforge.net/bigdata/?rev=3617&view=rev
Author: thompsonbry
Date: 2010-09-23 20:10:58 +0000 (Thu, 23 Sep 2010)
Log Message:
-----------
Finally chased down one bug which I had introduced in QueryResultIterator. I've added a bunch of unit tests and the PipelineType annotation, which will eventually support both vectored and operator at a time evaluation. I am still chasing the bug with multiple chunk messages flowing through the query controller.
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java
branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineDelayOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java
Added Paths:
-----------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/CancelQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -167,7 +167,7 @@
* override this method.
*/
BOpEvaluationContext getEvaluationContext();
-
+
/**
* Return <code>true</code> iff this operator is an access path which writes
* on the database.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -344,7 +344,8 @@
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append(getClass().getName());
+// sb.append(getClass().getName());
+ sb.append(super.toString());
sb.append("(");
for (int i = 0; i < args.length; i++) {
final BOp t = args[i];
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -27,31 +27,18 @@
*/
package com.bigdata.bop;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-
import org.apache.log4j.Logger;
import com.bigdata.bop.engine.BOpStats;
import com.bigdata.bop.engine.IChunkMessage;
import com.bigdata.bop.engine.IRunningQuery;
import com.bigdata.bop.engine.RunningQuery;
-import com.bigdata.btree.IIndex;
import com.bigdata.btree.ILocalBTreeView;
-import com.bigdata.btree.IRangeQuery;
import com.bigdata.journal.IIndexManager;
-import com.bigdata.journal.TimestampUtility;
-import com.bigdata.relation.IRelation;
-import com.bigdata.relation.accesspath.AccessPath;
import com.bigdata.relation.accesspath.IAccessPath;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBlockingBuffer;
-import com.bigdata.relation.locator.IResourceLocator;
-import com.bigdata.relation.rule.IRule;
-import com.bigdata.relation.rule.eval.IJoinNexus;
-import com.bigdata.service.DataService;
import com.bigdata.service.IBigdataFederation;
-import com.bigdata.striterator.IKeyOrder;
import com.ibm.icu.impl.ByteBuffer;
/**
@@ -62,18 +49,10 @@
*/
public class BOpContext<E> extends BOpContextBase {
- static private final Logger log = Logger.getLogger(BOpContext.class);
+ static private final transient Logger log = Logger.getLogger(BOpContext.class);
private final IRunningQuery runningQuery;
-// private final IBigdataFederation<?> fed;
-//
-// private final IIndexManager indexManager;
-//
-// private final long readTimestamp;
-//
-// private final long writeTimestamp;
-
private final int partitionId;
private final BOpStats stats;
@@ -95,60 +74,8 @@
public IRunningQuery getRunningQuery() {
return runningQuery;
}
-
+
/**
- * The {@link IBigdataFederation} IFF the operator is being evaluated on an
- * {@link IBigdataFederation}. When evaluating operations against an
- * {@link IBigdataFederation}, this reference provides access to the
- * scale-out view of the indices and to other bigdata services.
- */
- @Override
- public IBigdataFederation<?> getFederation() {
- return runningQuery.getFederation();
- }
-
- /**
- * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs
- * against the local indices. In scale-out, query evaluation proceeds shard
- * wise and this {@link IIndexManager} MUST be able to read on the
- * {@link ILocalBTreeView}.
- */
- @Override
- public IIndexManager getIndexManager() {
- return runningQuery.getIndexManager();
- }
-
- /**
- * Return the {@link Executor} on to which the operator may submit tasks.
- * <p>
- * Note: The is the {@link ExecutorService} associated with the
- * <em>local</em> {@link #getIndexManager() index manager}.
- */
- public final Executor getExecutorService() {
- return runningQuery.getIndexManager().getExecutorService();
- }
-
-// /**
-// * The timestamp or transaction identifier against which the query is
-// * reading.
-// *
-// * @deprecated by {@link BOp.Annotations#TIMESTAMP}
-// */
-// public final long getReadTimestamp() {
-// return runningQuery.getReadTimestamp();
-// }
-//
-// /**
-// * The timestamp or transaction identifier against which the query is
-// * writing.
-// *
-// * @deprecated by {@link BOp.Annotations#TIMESTAMP}
-// */
-// public final long getWriteTimestamp() {
-// return runningQuery.getWriteTimestamp();
-// }
-
- /**
* The index partition identifier -or- <code>-1</code> if the index is not
* sharded.
*/
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -27,7 +27,11 @@
*/
package com.bigdata.bop;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
import org.apache.log4j.Logger;
+
import com.bigdata.bop.engine.QueryEngine;
import com.bigdata.btree.IIndex;
import com.bigdata.btree.ILocalBTreeView;
@@ -53,7 +57,7 @@
*/
public class BOpContextBase {
- static private final Logger log = Logger.getLogger(BOpContextBase.class);
+ static private final transient Logger log = Logger.getLogger(BOpContextBase.class);
private final QueryEngine queryEngine;
@@ -63,7 +67,7 @@
* wise and this {@link IIndexManager} MUST be able to read on the
* {@link ILocalBTreeView}.
*/
- public IIndexManager getIndexManager() {
+ final public IIndexManager getIndexManager() {
return queryEngine.getIndexManager();
}
@@ -73,11 +77,21 @@
* {@link IBigdataFederation}, this reference provides access to the
* scale-out view of the indices and to other bigdata services.
*/
- public IBigdataFederation<?> getFederation() {
+ final public IBigdataFederation<?> getFederation() {
return queryEngine.getFederation();
}
/**
+ * Return the {@link Executor} on to which the operator may submit tasks.
+ * <p>
+ * Note: The is the {@link ExecutorService} associated with the
+ * <em>local</em> {@link #getIndexManager() index manager}.
+ */
+ public final Executor getExecutorService() {
+ return getIndexManager().getExecutorService();
+ }
+
+ /**
*
* @param indexManager
* The <strong>local</strong> {@link IIndexManager}. Query
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -27,6 +27,7 @@
package com.bigdata.bop;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -374,7 +375,7 @@
* @param op
* A {@link BOp}.
*
- * @return The index.
+ * @return The index, which is immutable and thread-safe.
*
* @throws DuplicateBOpIdException
* if there are two or more {@link BOp}s having the same
@@ -412,7 +413,8 @@
throw new DuplicateBOpException(t.toString());
}
}
- return map;
+ // wrap to ensure immutable and thread-safe.
+ return Collections.unmodifiableMap(map);
}
/**
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -209,6 +209,16 @@
protected static transient final TimeUnit chunkTimeoutUnit = TimeUnit.MILLISECONDS;
/**
+ * Return the {@link PipelineType} of the operator (default
+ * {@link PipelineType#Vectored}).
+ */
+ public PipelineType getPipelineType() {
+
+ return PipelineType.Vectored;
+
+ }
+
+ /**
* Return <code>true</code> iff {@link #newStats()} must be shared across
* all invocations of {@link #eval(BOpContext)} for this operator for a
* given query (default <code>false</code>).
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -0,0 +1,68 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 21, 2010
+ */
+
+package com.bigdata.bop;
+
+/**
+ * Return the type of pipelining supported by an operator.
+ * <p>
+ * Note: bigdata does not support tuple-at-a-time processing. Only vectored and
+ * operator-at-a-time processing. Tuple at a time processing is generally very
+ * inefficient.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public enum PipelineType {
+
+ /**
+ * Vectored operators stream chunks of intermediate results from one
+ * operator to the next using producer / consumer pattern. Each time a set
+ * of intermediate results is available for a vectored operator, it is
+ * evaluated against those inputs producing another set of intermediate
+ * results for its target operator(s). Vectored operators may be evaluated
+ * many times during a given query and often have excellent parallelism due
+ * to the concurrent evaluation of the different operators on different sets
+ * of intermediate results.
+ */
+ Vectored,
+
+ /**
+ * The operator will run exactly once and must wait for all of its inputs to
+ * be assembled before it runs.
+ * <p>
+ * There are some operations for which this is always true, such as SORT.
+ * Other operations MAY use operator-at-once evaluation in order to benefit
+ * from a combination of more efficient IO patterns and simpler design.
+ * However, pipelined operators using large memory blocks have many of the
+ * benefits of operator-at-once evaluation. By deferring their evaluation
+ * until some minimum number of source data blocks are available, they may
+ * be evaluated once or more than once, depending on the data scale.
+ */
+ OneShot;
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -30,6 +30,8 @@
import java.util.Map;
+import cern.colt.Arrays;
+
import com.bigdata.bop.AbstractChunkedOrderedIteratorOp;
import com.bigdata.bop.BOp;
import com.bigdata.bop.Constant;
@@ -414,7 +416,15 @@
for (Map.Entry<String, Object> e : annotations.entrySet()) {
if (!first)
sb.append(", ");
- sb.append(e.getKey() + "=" + e.getValue());
+ // @todo remove relation name hack when making relation name a scalar.
+ if (Annotations.RELATION_NAME.equals(e.getKey())
+ && e.getValue() != null
+ && e.getValue().getClass().isArray()) {
+ sb.append(e.getKey() + "="
+ + Arrays.toString((String[]) e.getValue()));
+ } else {
+ sb.append(e.getKey() + "=" + e.getValue());
+ }
first = false;
}
sb.append("]");
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -35,6 +35,7 @@
import com.bigdata.bop.BOpContext;
import com.bigdata.bop.BindingSetPipelineOp;
import com.bigdata.bop.IBindingSet;
+import com.bigdata.bop.IConstraint;
import com.bigdata.bop.engine.BOpStats;
import com.bigdata.bop.engine.IChunkAccessor;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
@@ -56,6 +57,16 @@
*/
private static final long serialVersionUID = 1L;
+ public interface Annotations extends BindingSetPipelineOp.Annotations {
+
+ /**
+ * An optional {@link IConstraint}[] which places restrictions on the
+ * legal patterns in the variable bindings.
+ */
+ String CONSTRAINTS = CopyBindingSetOp.class.getName() + ".constraints";
+
+ }
+
/**
* Deep copy constructor.
*
@@ -75,10 +86,19 @@
super(args, annotations);
}
+ /**
+ * @see Annotations#CONSTRAINTS
+ */
+ public IConstraint[] constraints() {
+
+ return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */);
+
+ }
+
public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
- return new FutureTask<Void>(new CopyTask(context));
-
+ return new FutureTask<Void>(new CopyTask(this, context));
+
}
/**
@@ -90,11 +110,19 @@
static private class CopyTask implements Callable<Void> {
private final BOpContext<IBindingSet> context;
-
- CopyTask(final BOpContext<IBindingSet> context) {
-
+
+ /**
+ * The constraint (if any) specified for the join operator.
+ */
+ final private IConstraint[] constraints;
+
+ CopyTask(final CopyBindingSetOp op,
+ final BOpContext<IBindingSet> context) {
+
this.context = context;
-
+
+ this.constraints = op.constraints();
+
}
public Void call() throws Exception {
@@ -108,9 +136,10 @@
final IBindingSet[] chunk = source.next();
stats.chunksIn.increment();
stats.unitsIn.add(chunk.length);
- sink.add(chunk);
+ final IBindingSet[] tmp = applyConstraints(chunk);
+ sink.add(tmp);
if (sink2 != null)
- sink2.add(chunk);
+ sink2.add(tmp);
}
sink.flush();
if (sink2 != null)
@@ -124,6 +153,56 @@
}
}
- }
+ private IBindingSet[] applyConstraints(final IBindingSet[] chunk) {
+
+ if (constraints == null) {
+ /*
+ * No constraints, copy all binding sets.
+ */
+
+ return chunk;
+
+ }
+
+ /*
+ * Copy binding sets which satisfy the constraint(s).
+ */
+
+ IBindingSet[] t = new IBindingSet[chunk.length];
+
+ int j = 0;
+
+ for (int i = 0; i < chunk.length; i++) {
+
+ final IBindingSet bindingSet = chunk[i];
+
+ if (context.isConsistent(constraints, bindingSet)) {
+
+ t[j++] = bindingSet;
+
+ }
+
+ }
+
+ if (j != chunk.length) {
+
+ // allocate exact size array.
+ final IBindingSet[] tmp = (IBindingSet[]) java.lang.reflect.Array
+ .newInstance(chunk[0].getClass(), j);
+
+ // make a dense copy.
+ System.arraycopy(t/* src */, 0/* srcPos */, tmp/* dst */,
+ 0/* dstPos */, j/* len */);
+
+ t = tmp;
+
+ }
+
+ return t;
+
+ }
+
+ } // class CopyTask
+
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -54,7 +54,7 @@
* be mapped across shards or nodes as appropriate for the parent. UNION runs on
* the query controller. In order to avoid routing intermediate results through
* the controller, the {@link BindingSetPipelineOp.Annotations#SINK_REF} of each
- * child operand should be overriden to specify the parent of the UNION
+ * child operand should be overridden to specify the parent of the UNION
* operator.
* <p>
* UNION can not be used when the intermediate results must be routed into the
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -67,21 +67,26 @@
* #of chunks in.
*/
final public CAT chunksIn = new CAT();
+// final public AtomicLong chunksIn = new AtomicLong();
/**
* #of units sets in (tuples, elements, binding sets, etc).
*/
final public CAT unitsIn = new CAT();
+// final public AtomicLong unitsIn = new AtomicLong();
/**
* #of chunks out.
*/
final public CAT chunksOut = new CAT();
+// final public AtomicLong chunksOut = new AtomicLong();
+
/**
* #of units sets in (tuples, elements, binding sets, etc).
*/
final public CAT unitsOut = new CAT();
+// final public AtomicLong unitsOut = new AtomicLong();
/**
* Constructor.
@@ -105,15 +110,20 @@
unitsIn.add(o.unitsIn.get());
unitsOut.add(o.unitsOut.get());
chunksOut.add(o.chunksOut.get());
+// chunksIn.addAndGet(o.chunksIn.get());
+// unitsIn.addAndGet(o.unitsIn.get());
+// unitsOut.addAndGet(o.unitsOut.get());
+// chunksOut.addAndGet(o.chunksOut.get());
}
+
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append(getClass().getName());
- sb.append("{chunksIn=" + chunksIn.estimate_get());
- sb.append(",unitsIn=" + unitsIn.estimate_get());
- sb.append(",chunksOut=" + chunksOut.estimate_get());
- sb.append(",unitsOut=" + unitsOut.estimate_get());
+ sb.append("{chunksIn=" + chunksIn.get());
+ sb.append(",unitsIn=" + unitsIn.get());
+ sb.append(",chunksOut=" + chunksOut.get());
+ sb.append(",unitsOut=" + unitsOut.get());
toString(sb); // extension hook
sb.append("}");
return sb.toString();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -3,6 +3,8 @@
import java.io.Serializable;
import java.util.UUID;
+import com.bigdata.bop.BOp;
+
/**
* A message sent to the {@link IQueryClient} when an operator is done executing
* for some chunk of inputs.
@@ -53,7 +55,7 @@
* scale-out, this is one per index partition over which the intermediate
* results were mapped.
*/
- final public int sinkChunksOut;
+ final public int sinkMessagesOut;
/**
* The operator identifier for the alternative sink -or- <code>null</code>
@@ -71,7 +73,7 @@
* results were mapped. It is zero if there was no alternative sink for the
* operator.
*/
- final public int altSinkChunksOut;
+ final public int altSinkMessagesOut;
/**
* The statistics for the execution of the bop against the partition on the
@@ -91,10 +93,19 @@
* The node which executed the operator.
* @param cause
* <code>null</code> unless execution halted abnormally.
- * @param chunksOut
- * A map reporting the #of binding set chunks which were output
- * for each downstream operator for which at least one chunk of
- * output was produced.
+ * @param sinkId
+ * The {@link BOp.Annotations#BOP_ID} of the default sink and
+ * <code>null</code> if there is no sink (for example, if this is
+ * the last operator in the pipeline).
+ * @param sinkMessagesOut
+ * The number of {@link IChunkMessage} which were sent to the
+ * operator for the default sink.
+ * @param altSinkId
+ * The {@link BOp.Annotations#BOP_ID} of the alternative sink and
+ * <code>null</code> if there is no alternative sink.
+ * @param altSinkMessagesOut
+ * The number of {@link IChunkMessage} which were sent to the
+ * operator for the alternative sink.
* @param taskStats
* The statistics for the execution of that bop on that shard and
* service.
@@ -103,8 +114,8 @@
//
final UUID queryId, final int bopId, final int partitionId,
final UUID serviceId, Throwable cause, //
- final Integer sinkId, final int sinkChunksOut,//
- final Integer altSinkId, final int altSinkChunksOut,//
+ final Integer sinkId, final int sinkMessagesOut,//
+ final Integer altSinkId, final int altSinkMessagesOut,//
final BOpStats taskStats) {
this.queryId = queryId;
@@ -113,9 +124,9 @@
this.serviceId = serviceId;
this.cause = cause;
this.sinkId = sinkId;
- this.sinkChunksOut = sinkChunksOut;
+ this.sinkMessagesOut = sinkMessagesOut;
this.altSinkId = altSinkId;
- this.altSinkChunksOut = altSinkChunksOut;
+ this.altSinkMessagesOut = altSinkMessagesOut;
this.taskStats = taskStats;
}
@@ -128,9 +139,9 @@
if (cause != null)
sb.append(",cause=" + cause);
sb.append(",sinkId=" + sinkId);
- sb.append(",sinkChunksOut=" + sinkChunksOut);
+ sb.append(",sinkChunksOut=" + sinkMessagesOut);
sb.append(",altSinkId=" + altSinkId);
- sb.append(",altSinkChunksOut=" + altSinkChunksOut);
+ sb.append(",altSinkChunksOut=" + altSinkMessagesOut);
sb.append(",stats=" + taskStats);
sb.append("}");
return sb.toString();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -49,4 +49,18 @@
*/
void bufferReady(IChunkMessage<IBindingSet> msg) throws RemoteException;
+ /**
+ * Notify a service that the query has been terminated. The peer MUST NOT
+ * cancel the query synchronously as that can lead to a deadlock with the
+ * query controller. Instead, the peer should queue a task to cancel the
+ * query and then return.
+ *
+ * @param queryId
+ * The query identifier.
+ * @param cause
+ * The cause. When <code>null</code>, this is presumed to be
+ * normal query termination.
+ */
+ void cancelQuery(UUID queryId, Throwable cause) throws RemoteException;
+
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -56,7 +56,14 @@
IIndexManager getIndexManager();
/**
- * Terminate query evaluation
+ * Cancel the running query (normal termination).
+ * <p>
+ * Note: This method provides a means for an operator to indicate that the
+ * query should halt immediately for reasons other than abnormal
+ * termination.
+ * <p>
+ * Note: For abnormal termination of a query, just throw an exception out of
+ * the query operator implementation.
*/
void halt();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -48,18 +48,16 @@
private static final Logger log = Logger.getLogger(PipelineUtility.class);
/**
- * Return <code>true</code> iff the <i>runningCountMap</i> AND
- * <i>availableChunkMap</i> map are ZERO (0) for both the given operator and
- * for all operators which proceed the given operator in the tree structure
- * of its operands.
+ * Return <code>true</code> iff <i>availableChunkMap</i> map is ZERO (0) for
+ * the given operator and its descendants AND the <i>runningCountMap</i> is
+ * ZERO (0) for the operator and all descendants of the operator. For the
+ * purposes of this method, only {@link BOp#args() operands} are considered
+ * as descendants.
* <p>
- * Note: The movement of the intermediate binding set chunks forms an
- * acyclic directed graph. We can decide whether or not a {@link BOp} in the
- * query plan can be triggered by the current activity pattern by inspecting
- * the {@link BOp} and its operands recursively. If neither the {@link BOp}
- * nor any of its operands (recursively) has non-zero activity then the
- * {@link BOp} can not be triggered and this method will return
- * <code>true</code>.
+ * Note: The movement of the intermediate binding set chunks during query
+ * processing forms an acyclic directed graph. We can decide whether or not
+ * a {@link BOp} in the query plan can be triggered by the current activity
+ * pattern by inspecting the {@link BOp} and its operands recursively.
*
* @param bopId
* The identifier for an operator which appears in the query
@@ -92,8 +90,10 @@
if (queryPlan == null)
throw new IllegalArgumentException();
+
if (queryIndex == null)
throw new IllegalArgumentException();
+
if (availableChunkCountMap == null)
throw new IllegalArgumentException();
@@ -103,7 +103,7 @@
throw new NoSuchBOpException(bopId);
final Iterator<BOp> itr = BOpUtility.preOrderIterator(op);
-
+
while (itr.hasNext()) {
final BOp t = itr.next();
@@ -112,8 +112,17 @@
if (id == null)
continue;
+
{
+ /*
+ * If the operator is running then it is, defacto, "not done."
+ *
+ * If any descendants of the operator are running, then they
+ * could cause the operator to be re-triggered and it is "not
+ * done."
+ */
+
final AtomicLong runningCount = runningCountMap.get(id);
if (runningCount != null && runningCount.get() != 0) {
@@ -125,11 +134,16 @@
return false;
}
-
+
}
{
-
+
+ /*
+ * Any chunks available for the operator in question or any of
+ * its descendants could cause that operator to be triggered.
+ */
+
final AtomicLong availableChunkCount = availableChunkCountMap
.get(id);
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -41,31 +41,16 @@
import org.apache.log4j.Logger;
-import alice.tuprolog.Prolog;
-
import com.bigdata.bop.BOp;
import com.bigdata.bop.BindingSetPipelineOp;
import com.bigdata.bop.IBindingSet;
-import com.bigdata.bop.IPredicate;
-import com.bigdata.bop.bset.Union;
import com.bigdata.btree.BTree;
import com.bigdata.btree.IndexSegment;
import com.bigdata.btree.view.FusedView;
import com.bigdata.journal.IIndexManager;
-import com.bigdata.rdf.internal.IV;
-import com.bigdata.rdf.spo.SPORelation;
-import com.bigdata.relation.IMutableRelation;
-import com.bigdata.relation.IRelation;
-import com.bigdata.relation.accesspath.IElementFilter;
-import com.bigdata.relation.rule.IRule;
-import com.bigdata.relation.rule.Program;
-import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask;
import com.bigdata.resources.IndexManager;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.IDataService;
-import com.bigdata.service.ndx.IAsynchronousWriteBufferFactory;
-import com.bigdata.striterator.ChunkedArrayIterator;
-import com.bigdata.striterator.IChunkedOrderedIterator;
/**
* A class managing execution of concurrent queries against a local
@@ -185,132 +170,6 @@
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
* @version $Id$
*
- *
- * FIXME Unit tests for non-distinct {@link IElementFilter}s on an
- * {@link IPredicate}, unit tests for distinct element filter on an
- * {@link IPredicate} which is capable of distributed operations. Do not use
- * distinct where not required (SPOC, only one graph, etc).
- * <p>
- * It seems like the right way to approach this is by unifying the stackable CTC
- * striterator pattern with the chunked iterator pattern and passing the query
- * engine (or the bop context) into the iterator construction process (or simply
- * requesting that the query engine construct the iterator stack).
- * <p>
- * In terms of harmonization, it is difficult to say which way would work
- * better. In the short term we could simply allow both and mask the differences
- * in how we construct the filters, but the conversion to/from striterators and
- * chunked iterators seems to waste a bit of effort.
- * <p>
- * The trickiest part of all of this is to allow a distributed filter pattern
- * where the filter gets created on a set of nodes identified by the operator
- * and the elements move among those nodes using the query engine's buffers.
- * <p>
- * To actually implement the distributed distinct filter we need to stack the
- * following:
- *
- * <pre>
- * - ITupleIterator
- * - Resolve ITuple to Element (e.g., SPOC).
- * - Layer on optional IElementFilter associated with the IPredicate.
- * - Layer on SameVariableConstraint iff required (done by AccessPath)
- * - Resolve SPO to SPO, stripping off the context position.
- * - Chunk SPOs (SPO[], IKeyOrder), where the key order is from the access path.
- * - Filter SPO[] using DHT constructed on specified nodes of the cluster.
- * The SPO[] chunks should be packaged into NIO buffers and shipped to those
- * nodes. The results should be shipped back as a bit vectors packaged into
- * a NIO buffers.
- * - Dechunk SPO[] to SPO since that is the current expectation for the filter
- * stack.
- * - The result then gets wrapped as a {@link IChunkedOrderedIterator} by
- * the AccessPath using a {@link ChunkedArrayIterator}.
- * </pre>
- *
- * This stack is a bit complex(!). But it is certainly easy enough to generate
- * the necessary bits programmatically.
- *
- * FIXME Handling the {@link Union} of binding sets. Consider whether the chunk
- * combiner logic from the {@link DistributedJoinTask} could be reused.
- *
- * FIXME INSERT and DELETE which will construct elements using
- * {@link IRelation#newElement(java.util.List, IBindingSet)} from a binding set
- * and then use {@link IMutableRelation#insert(IChunkedOrderedIterator)} and
- * {@link IMutableRelation#delete(IChunkedOrderedIterator)}. For s/o, we first
- * need to move the bits into the right places so it makes sense to unpack the
- * processing of the loop over the elements and move the data around, writing on
- * each index as necessary. There could be eventually consistent approaches to
- * this as well. For justifications we need to update some additional indices,
- * in which case we are stuck going through {@link IRelation} rather than
- * routing data directly or using the {@link IAsynchronousWriteBufferFactory}.
- * For example, we could handle routing and writing in s/o as follows:
- *
- * <pre>
- * INSERT(relation,bindingSets)
- *
- * expands to
- *
- * SEQUENCE(
- * SELECT(s,p,o), // drop bindings that we do not need
- * PARALLEL(
- * INSERT_INDEX(spo), // construct (s,p,o) elements and insert
- * INSERT_INDEX(pos), // construct (p,o,s) elements and insert
- * INSERT_INDEX(osp), // construct (o,s,p) elements and insert
- * ))
- *
- * </pre>
- *
- * The output of the SELECT operator would be automatically mapped against the
- * shards on which the next operators need to write. Since there is a nested
- * PARALLEL operator, the mapping will be against the shards of each of the
- * given indices. (A simpler operator would invoke
- * {@link SPORelation#insert(IChunkedOrderedIterator)}. Handling justifications
- * requires that we also formulate the justification chain from the pattern of
- * variable bindings in the rule).
- *
- * FIXME Handle {@link Program}s. There are three flavors, which should probably
- * be broken into three operators: sequence(ops), set(ops), and closure(op). The
- * 'set' version would be parallelized, or at least have an annotation for
- * parallel evaluation. These things belong in the same broad category as the
- * join graph since they are operators which control the evaluation of other
- * operators (the current pipeline join also has that characteristic which it
- * uses to do the nested index subqueries).
- *
- * FIXME SPARQL to BOP translation
- * <p>
- * The initial pass should translate from {@link IRule} to {@link BOp}s so we
- * can immediately begin running SPARQL queries against the {@link QueryEngine}.
- * A second pass should explore a rules base translation from the openrdf SPARQL
- * operator tree into {@link BOp}s, perhaps using an embedded {@link Prolog}
- * engine. What follows is a partial list of special considerations for that
- * translation:
- * <ul>
- * <li>Distinct can be trivially enforced for default graph queries against the
- * SPOC index.</li>
- * <li>Local distinct should wait until there is more than one tuple from the
- * index since a single tuple does not need to be made distinct using a hash
- * map.</li>
- * <li>Low volume distributed queries should use solution modifiers which
- * evaluate on the query controller node rather than using distributed sort,
- * distinct, slice, or aggregation operators.</li>
- * <li></li>
- * <li></li>
- * <li></li>
- * <li>High volume queries should use special operators (different
- * implementations of joins, use an external merge sort, etc).</li>
- * </ul>
- *
- * FIXME SPARQL Coverage: Add native support for all SPARQL operators. A lot of
- * this can be picked up from Sesame. Some things, such as isIRI() can be done
- * natively against the {@link IV}. Likewise, there is already a set of
- * comparison methods for {@link IV}s which are inlined values. Add support for
- * <ul>
- * <li></li>
- * <li></li>
- * <li></li>
- * <li></li>
- * <li></li>
- * <li></li>
- * </ul>
- *
* @todo Expander patterns will continue to exist until we handle the standalone
* backchainers in a different manner for scale-out so add support for
* those for now.
@@ -536,6 +395,8 @@
if (q.isCancelled())
continue;
final IChunkMessage<IBindingSet> chunk = q.chunksIn.poll();
+ if (chunk == null)
+ continue;
if (log.isTraceEnabled())
log.trace("Accepted chunk: " + chunk);
try {
@@ -820,6 +681,9 @@
*/
protected RunningQuery getRunningQuery(final UUID queryId) {
+ if(queryId == null)
+ throw new IllegalArgumentException();
+
return runningQueries.get(queryId);
}
@@ -868,4 +732,13 @@
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * The default implementation is a NOP.
+ */
+ public void cancelQuery(UUID queryId, Throwable cause) {
+ // NOP
+ }
+
}
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -0,0 +1,69 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 21, 2010
+ */
+
+package com.bigdata.bop.engine;
+
+import com.bigdata.bop.PipelineOp;
+
+/**
+ * Annotations understood by the {@link QueryEngine} which are used for some
+ * unit tests but which should not be used for real queries.
+ * <p>
+ * Note: This class is in the main source tree because {@link QueryEngine}
+ * references it, but the annotations defined here should only be specified from
+ * within a unit test.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public interface QueryEngineTestAnnotations {
+
+ /**
+ * When <code>true</code>, each chunk will be sent out using its own
+ * {@link IChunkMessage}. Otherwise the {@link QueryEngine} MAY (and
+ * generally does) combine the chunks in the output of a given operator
+ * evaluation pass into a single {@link IChunkMessage} per target query
+ * peer.
+ * <p>
+ * Note: This annotation was introduced to make it easier to control the #of
+ * {@link IChunkMessage}s output from a given operator and thereby diagnose
+ * {@link RunState} termination conditions linked to having multiple
+ * {@link IChunkMessage}s.
+ * <p>
+ * Note: Just controlling the {@link PipelineOp.Annotations#CHUNK_CAPACITY}
+ * and {@link PipelineOp.Annotations#CHUNK_OF_CHUNKS_CAPACITY} is not enough
+ * to force the {@link QueryEngine} to run the an operator once per source
+ * chunk. The {@link QueryEngine} normally combines chunks together. You
+ * MUST also specify this annotation in order for the query engine to send
+ * multiple {@link IChunkMessage} rather than just one.
+ */
+ String ONE_MESSAGE_PER_CHUNK = QueryEngineTestAnnotations.class.getName()
+ + ".oneMessagePerChunk";
+
+ boolean DEFAULT_ONE_MESSAGE_PER_CHUNK = false;
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -0,0 +1,98 @@
+package com.bigdata.bop.engine;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.bigdata.relation.accesspath.IAsynchronousIterator;
+
+/**
+ * Delegate pattern cancels the {@link RunningQuery} when the iterator is
+ * {@link #close() closed} and signals normal completion of the query once the
+ * iterator is exhausted.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+class QueryResultIterator<E> implements IAsynchronousIterator<E> {
+
+ private final RunningQuery runningQuery;
+
+ private final IAsynchronousIterator<E> src;
+
+ private final AtomicBoolean open = new AtomicBoolean(true);
+
+ public QueryResultIterator(final RunningQuery runningQuery,
+ final IAsynchronousIterator<E> src) {
+
+ if (runningQuery == null)
+ throw new IllegalArgumentException();
+
+ if (src == null)
+ throw new IllegalArgumentException();
+
+ this.runningQuery = runningQuery;
+
+ this.src = src;
+
+ }
+
+ public void close() {
+ if (open.compareAndSet(true/* expect */, false/* update */)) {
+ try {
+ runningQuery.cancel(true/* mayInterruptIfRunning */);
+ } finally {
+ src.close();
+ }
+ }
+ }
+
+ private void normalCompletion() {
+ if (open.compareAndSet(true/* expect */, false/* update */)) {
+ /*
+ * Note: DO NOT halt the query here!!!! That will cause it to not
+ * accept any more messages. Just close the source iterator.
+ */
+// try {
+// runningQuery.halt();
+// } finally {
+ src.close();
+// }
+ }
+ }
+
+ public boolean isExhausted() {
+// return src.isExhausted();
+ if (src.isExhausted()) {
+ normalCompletion();
+ return true;
+ }
+ return false;
+ }
+
+ public boolean hasNext() {
+// return src.hasNext();
+ if (!src.hasNext()) {
+ normalCompletion();
+ return false;
+ }
+ return true;
+ }
+
+ public boolean hasNext(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return src.hasNext(timeout, unit);
+ }
+
+ public E next(long timeout, TimeUnit unit) throws InterruptedException {
+ return src.next(timeout, unit);
+ }
+
+ public E next() {
+ return src.next();
+ }
+
+ public void remove() {
+ src.remove();
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-23 16:09:13 UTC (rev 3616)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-23 20:10:58 UTC (rev 3617)
@@ -29,11 +29,12 @@
import java.rmi.RemoteException;
import java.util.Arrays;
-import java.util.LinkedHashMap;
+import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,12 +43,10 @@
import org.apache.log4j.Logger;
import com.bigdata.bop.BOp;
-import com.bigdata.util.InnerCause;
+import com.bigdata.relation.accesspath.IBlockingBuffer;
/**
- * The run state for a {@link RunningQuery}. This class is NOT thread-safe.
- * {@link RunningQuery} uses an internal lock to serialize requests against the
- * public methods of this class.
+ * The run state for a {@link RunningQuery}. This class is thread-safe.
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
* @version $Id$
@@ -66,19 +65,63 @@
}
/**
- * Note: Due to concurrency, it is possible for an {@link IChunkMessage} to
- * be accepted and the corresponding chunk task started, before a
- * {@link RunState#startOp(StartOpMessage)} transition has been fully
- * processed. This means that the {@link RunState#totalAvailableChunkCount}
- * can become transiently negative. This flag disables asserts which would
- * otherwise fail on legal transient negatives.
+ * Message if the query has already started evaluation.
*/
- static private boolean availableChunkCountMayBeNegative = true;
+ static private final transient String ERR_QUERY_STARTED = "Query already running.";
+
+ /**
+ * Message if query evaluation has already halted.
+ */
+ static private final transient String ERR_QUERY_HALTED = "Query already halted.";
+
+ /**
+ * Message if an operator addressed by a {@link HaltOpMessage} was never started.
+ */
+ static private final transient String ERR_OP_NOT_STARTED = "Operator never ran.";
+
+ /**
+ * Message if an operator addressed by a message has been halted.
+ */
+ static private final transient String ERR_OP_HALTED = "Operator is not running.";
+
+ /**
+ * Message if a query deadline has been exceeded.
+ */
+ static private final transient String ERR_DEADLINE = "Query deadline is expired.";
+
+ /**
+ * {@link RunningQuery#handleOutputChunk(BOp, int, IBlockingBuffer)} drops
+ * {@link IChunkMessage}s onto {@link RunningQuery#chunksIn} and drops the
+ * {@link RunningQuery} on {@link QueryEngine#runningQueries} as soon as
+ * output {@link IChunkMessage}s are generated. A {@link IChunkMessage} MAY
+ * be taken for evaluation as soon as it is published. This means that the
+ * operator which will consume that {@link IChunkMessage} can begin to
+ * execute <em>before</em> {@link RunningQuery#haltOp(HaltOpMessage)} is
+ * invoked to indicate the end of the operator which produced that
+ * {@link IChunkMessage}.
+ * <p>
+ * This is all fine. However, due to the potential overlap in these
+ * schedules {@link RunState#totalAvailableCount} can become transiently
+ * negative. This flag disables asserts which would otherwise fail on legal
+ * transient negatives.
+ */
+ static private final boolean availableMessageCountMayBeNegative = true;
/**
+ * Flag may be used to turn on stderr output.
+ */
+ static private final boolean debug = true;
+
+ /**
* The query.
*/
- private final RunningQuery query;
+ private final BOp query;
+
+ /**
+ * An index from {@link BOp.Annotations#BOP_ID} to {@link BOp} for the
+ * {@link #query}.
+ */
+ private final Map<Integer,BOp> bopIndex;
/**
* The query identifier.
@@ -94,36 +137,42 @@
private final long deadline;
/**
+ * Set to <code>true</code> iff the query evaluation has begun.
+ *
+ * @see #startQuery(IChunkMessage)
+ */
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ /**
* Set to <code>true</code> iff the query evaluation is complete due to
* normal termination.
- * <p>
- * Note: This is package private to expose it to {@link RunningQuery}.
*
* @see #haltOp(HaltOpMessage)
*/
- /*private*/ final AtomicBoolean allDone = new AtomicBoolean(false);
+ private final AtomicBoolean allDone = new AtomicBoolean(false);
/**
* The #of run state transitions which have occurred for this query.
*/
- private long nsteps = 0;
+ private final AtomicLong nsteps = new AtomicLong();
/**
* The #of tasks for this query which have started but not yet halted.
*/
- private long totalRunningTaskCount = 0;
+ private final AtomicLong totalRunningCount = new AtomicLong();
/**
- * The #of chunks for this query of which a running task has made available
- * but which have not yet been accepted for processing by another task.
+ * The #of {@link IChunkMessage} for the query which a running task has made
+ * available but which have not yet been accepted for processing by another
+ * task.
*/
- private long totalAvailableChunkCount = 0;
+ private final AtomicLong totalAvailableCount = new AtomicLong();
/**
- * A map reporting the #of chunks available for each operator in the
- * pipeline (we only report chunks for pipeline operators). The total #of
- * chunks available across all operators in the pipeline is reported by
- * {@link #totalAvailableChunkCount}.
+ * A map reporting the #of {@link IChunkMessage} available for each operator
+ * in the pipeline. The total #of {@link IChunkMessage}s available across
+ * all operators in the pipeline is reported by {@link #totalAvailableCount}
+ * .
* <p>
* The movement of the intermediate binding set chunks forms an acyclic
* directed graph. This map is used to track the #of chunks available for
@@ -132,62 +181,166 @@
* {@link BOp} had executed informing the {@link QueryEngine} on that node
* that it should immediately release all resources associated with that
* {@link BOp}.
+ * <p>
+ * Note: This collection is package private in order to expose its state to
+ * the unit tests. Since the map contains {@link AtomicLong}s it can not be
+ * readily exposed as {@link Map} object. If we were to expose the map, it
+ * would have to be via a get(key) style interface.
*/
- private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>();
+ /* private */final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableMap = new ConcurrentHashMap<Integer, AtomicLong>();
/**
* A collection reporting on the #of instances of a given {@link BOp} which
* are concurrently executing.
+ * <p>
+ * Note: This collection is package private in order to expose its state to
+ * the unit tests. Since the map contains {@link AtomicLong}s it can not be
+ * readily exposed as {@link Map} object. If we were to expose the map, it
+ * would have to be via a get(key) style interface.
*/
- private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningTaskCountMap = new LinkedHashMap<Integer, AtomicLong>();
+ /* private */final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningMap = new ConcurrentHashMap<Integer, AtomicLong>();
/**
* A collection of the operators which have executed at least once.
*/
private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>();
- public RunState(final RunningQuery query) {
+ /**
+ * Return the query identifier specified to the constructor.
+ */
+ final public UUID getQueryId() {
+ return queryId;
+ }
- this.query = query;
+ /**
+ * Return the deadline specified to the constructor.
+ */
+ final public long getDeadline() {
+ return deadline;
+ }
- this.queryId = query.getQueryId();
+ /**
+ * Return <code>true</code> if evaluation of the query has been initiated
+ * using {@link #startQuery(IChunkMessage)}.
+ */
+ final public boolean isStarted() {
+ return started.get();
+ }
- this.deadline = query.getDeadline();
-
- // this.nops = query.bopIndex.size();
+ /**
+ * Return <code>true</code> if the query is known to be completed based on
+ * the {@link #haltOp(HaltOpMessage)}.
+ */
+ final public boolean isAllDone() {
+ return allDone.get();
+ }
+ /**
+ * The #of run state transitions which have occurred for this query.
+ */
+ final public long getStepCount() {
+ return nsteps.get();
}
- public void startQuery(final IChunkMessage<?> msg) {
+ /**
+ * The #...
[truncated message content] |