|
From: <tho...@us...> - 2010-09-05 20:17:51
|
Revision: 3510
http://bigdata.svn.sourceforge.net/bigdata/?rev=3510&view=rev
Author: thompsonbry
Date: 2010-09-05 20:17:44 +0000 (Sun, 05 Sep 2010)
Log Message:
-----------
Introduced an interface for running queries and refactored the BOpContext to use that interface. This makes it possible to test more things with mock objects and also provides a hook from the BOpContext back to the RunningQuery. That hook is necessary when we need to halt a running query (for an offset/limit slice).
Added Rule2BOpUtility which will handle the initial conversion from a Rule or Program into a bop tree.
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java
Added Paths:
-----------
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -27,20 +27,18 @@
*/
package com.bigdata.bop;
-import java.util.Iterator;
-
import org.apache.log4j.Logger;
import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.IRunningQuery;
import com.bigdata.bop.engine.QueryEngine;
+import com.bigdata.bop.engine.RunningQuery;
import com.bigdata.bop.solutions.SliceOp;
import com.bigdata.btree.IIndex;
import com.bigdata.btree.ILocalBTreeView;
import com.bigdata.btree.IRangeQuery;
import com.bigdata.journal.IIndexManager;
-import com.bigdata.journal.ITx;
import com.bigdata.journal.TimestampUtility;
-import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.IRelation;
import com.bigdata.relation.accesspath.AccessPath;
import com.bigdata.relation.accesspath.IAccessPath;
@@ -49,11 +47,10 @@
import com.bigdata.relation.locator.IResourceLocator;
import com.bigdata.relation.rule.IRule;
import com.bigdata.relation.rule.eval.IJoinNexus;
-import com.bigdata.service.AbstractScaleOutFederation;
import com.bigdata.service.DataService;
import com.bigdata.service.IBigdataFederation;
-import com.bigdata.service.ndx.IClientIndex;
import com.bigdata.striterator.IKeyOrder;
+import com.ibm.icu.impl.ByteBuffer;
/**
* The evaluation context for the operator (NOT serializable).
@@ -65,14 +62,16 @@
static private final Logger log = Logger.getLogger(BOpContext.class);
- private final IBigdataFederation<?> fed;
+ private final IRunningQuery runningQuery;
+
+// private final IBigdataFederation<?> fed;
+//
+// private final IIndexManager indexManager;
+//
+// private final long readTimestamp;
+//
+// private final long writeTimestamp;
- private final IIndexManager indexManager;
-
- private final long readTimestamp;
-
- private final long writeTimestamp;
-
private final int partitionId;
private final BOpStats stats;
@@ -84,13 +83,25 @@
private final IBlockingBuffer<E[]> sink2;
/**
+ * The interface for a running query.
+ * <p>
+ * Note: In scale-out each node will have a distinct {@link IRunningQuery}
+ * object and the query controller will have access to additional state,
+ * such as the aggregation of the {@link BOpStats} for the query on all
+ * nodes.
+ */
+ public IRunningQuery getRunningQuery() {
+ return runningQuery;
+ }
+
+ /**
* The {@link IBigdataFederation} IFF the operator is being evaluated on an
* {@link IBigdataFederation}. When evaluating operations against an
* {@link IBigdataFederation}, this reference provides access to the
* scale-out view of the indices and to other bigdata services.
*/
public IBigdataFederation<?> getFederation() {
- return fed;
+ return runningQuery.getFederation();
}
/**
@@ -100,7 +111,7 @@
* {@link ILocalBTreeView}.
*/
public final IIndexManager getIndexManager() {
- return indexManager;
+ return runningQuery.getIndexManager();
}
/**
@@ -108,7 +119,7 @@
* reading.
*/
public final long getReadTimestamp() {
- return readTimestamp;
+ return runningQuery.getReadTimestamp();
}
/**
@@ -116,7 +127,7 @@
* writing.
*/
public final long getWriteTimestamp() {
- return writeTimestamp;
+ return runningQuery.getWriteTimestamp();
}
/**
@@ -137,6 +148,22 @@
/**
* Where to read the data to be consumed by the operator.
+ *
+ * @todo Since joins now run from locally materialized data in all cases the
+ * API could be simplified somewhat given that we know that there will
+ * be a single "source" chunk of binding sets. Also, the reason for
+ * the {@link IAsynchronousIterator} here is that a downstream join
+ * could error (or satisfy a slice) and halt the upstream joins. That
+ * is being coordinated through the {@link RunningQuery} now.
+ * <p>
+ * It is not yet clear what the right API is for the source. The
+ * iterator model might be just fine, but might not need to be
+ * asynchronous and does not need to be closeable.
+ * <p>
+ * Perhaps the right thing is to expose an object with a richer API
+ * for obtaining various kinds of iterators or even access to the
+ * direct {@link ByteBuffer}s backing the data (for high volume joins,
+ * exernal merge sorts, etc).
*/
public final IAsynchronousIterator<E[]> getSource() {
return source;
@@ -194,18 +221,6 @@
* failed joins outside of the join group.
*
* @throws IllegalArgumentException
- * if the <i>indexManager</i> is <code>null</code>
- * @throws IllegalArgumentException
- * if the <i>indexManager</i> is is not a <em>local</em> index
- * manager.
- * @throws IllegalArgumentException
- * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED}
- * (queries may not read on the unisolated indices).
- * @throws IllegalArgumentException
- * if the <i>writeTimestamp</i> is neither
- * {@link ITx#UNISOLATED} nor a read-write transaction
- * identifier.
- * @throws IllegalArgumentException
* if the <i>stats</i> is <code>null</code>
* @throws IllegalArgumentException
* if the <i>source</i> is <code>null</code> (use an empty
@@ -213,37 +228,47 @@
* @throws IllegalArgumentException
* if the <i>sink</i> is <code>null</code>
*/
- public BOpContext(final IBigdataFederation<?> fed,
- final IIndexManager indexManager, final long readTimestamp,
- final long writeTimestamp, final int partitionId,
+// * @throws IllegalArgumentException
+// * if the <i>indexManager</i> is <code>null</code>
+// * @throws IllegalArgumentException
+// * if the <i>indexManager</i> is is not a <em>local</em> index
+// * manager.
+// * @throws IllegalArgumentException
+// * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED}
+// * (queries may not read on the unisolated indices).
+// * @throws IllegalArgumentException
+// * if the <i>writeTimestamp</i> is neither
+// * {@link ITx#UNISOLATED} nor a read-write transaction
+// * identifier.
+ public BOpContext(final IRunningQuery runningQuery,final int partitionId,
final BOpStats stats, final IAsynchronousIterator<E[]> source,
final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) {
- if (indexManager == null)
- throw new IllegalArgumentException();
- if (indexManager instanceof IBigdataFederation<?>) {
- /*
- * This is disallowed because the predicate specifies an index
- * partition and expects to have access to the local index objects
- * for that index partition.
- */
- throw new IllegalArgumentException(
- "Expecting a local index manager, not: "
- + indexManager.getClass().toString());
- }
- if (readTimestamp == ITx.UNISOLATED)
- throw new IllegalArgumentException();
- if (TimestampUtility.isReadOnly(writeTimestamp))
- throw new IllegalArgumentException();
+ this.runningQuery = runningQuery;
+// if (indexManager == null)
+// throw new IllegalArgumentException();
+// if (indexManager instanceof IBigdataFederation<?>) {
+// /*
+// * This is disallowed because predicates always read on local index
+// * objects, even in scale-out.
+// */
+// throw new IllegalArgumentException(
+// "Expecting a local index manager, not: "
+// + indexManager.getClass().toString());
+// }
+// if (readTimestamp == ITx.UNISOLATED)
+// throw new IllegalArgumentException();
+// if (TimestampUtility.isReadOnly(writeTimestamp))
+// throw new IllegalArgumentException();
if (stats == null)
throw new IllegalArgumentException();
if (source == null)
throw new IllegalArgumentException();
if (sink == null)
throw new IllegalArgumentException();
- this.fed = fed; // may be null
- this.indexManager = indexManager;
- this.readTimestamp = readTimestamp;
- this.writeTimestamp = writeTimestamp;
+// this.fed = fed; // may be null
+// this.indexManager = indexManager;
+// this.readTimestamp = readTimestamp;
+// this.writeTimestamp = writeTimestamp;
this.partitionId = partitionId;
this.stats = stats;
this.source = source;
@@ -344,24 +369,28 @@
if (partitionId == -1) {
- if(indexManager instanceof IBigdataFederation<?>)
+ if (getFederation() != null) {
+ // This is scale-out so the partition identifier is required.
throw new UnsupportedOperationException();
+ }
// The index is not partitioned.
- ndx = (ILocalBTreeView) indexManager.getIndex(namespace + "."
+ ndx = (ILocalBTreeView) getIndexManager().getIndex(namespace + "."
+ keyOrder.getIndexName(), getWriteTimestamp());
} else {
- if(!(indexManager instanceof IBigdataFederation<?>))
+ if (getFederation() == null) {
+ // This is not scale-out so index partitions are not supported.
throw new UnsupportedOperationException();
+ }
// The name of the desired index partition.
final String name = DataService.getIndexPartitionName(namespace
+ "." + keyOrder.getIndexName(), partitionId);
// MUST be a local index view.
- ndx = (ILocalBTreeView) indexManager.getIndex(name,
+ ndx = (ILocalBTreeView) getIndexManager().getIndex(name,
getWriteTimestamp());
}
@@ -427,6 +456,10 @@
PipelineOp.Annotations.FULLY_BUFFERED_READ_THRESHOLD,
PipelineOp.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD);
+ final IIndexManager indexManager = getIndexManager();
+
+ final long readTimestamp = getReadTimestamp();
+
if (predicate.getPartitionId() != -1) {
/*
@@ -669,6 +702,8 @@
* with that thrown cause.
*/
public void halt() {
+
+ runningQuery.halt();
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -101,9 +101,6 @@
*/
private static final long serialVersionUID = 1L;
- /**
- * @todo Declare SLICE annotation and support SLICE in the {@link JoinTask}.
- */
public interface Annotations extends BindingSetPipelineOp.Annotations {
/**
@@ -478,11 +475,6 @@
this.optional = joinOp.isOptional();
this.variablesToKeep = joinOp.variablesToKeep();
this.context = context;
- /*
- * FIXME Carefully review which index manager (local versus fed) is
- * being used to resolve the relation. Also note that we used to
- * cache the resourceLocator.
- */
this.relation = context.getReadRelation(right);
this.source = context.getSource();
this.sink = context.getSink();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@@ -47,11 +46,10 @@
import com.bigdata.bop.Var;
import com.bigdata.bop.constraint.EQConstant;
import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.MockRunningQuery;
import com.bigdata.bop.engine.TestQueryEngine;
import com.bigdata.bop.solutions.DistinctBindingSetOp;
-import com.bigdata.journal.BufferMode;
import com.bigdata.journal.ITx;
-import com.bigdata.journal.Journal;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBlockingBuffer;
import com.bigdata.relation.accesspath.ThickAsynchronousIterator;
@@ -79,25 +77,25 @@
super(name);
}
- @Override
- public Properties getProperties() {
+// @Override
+// public Properties getProperties() {
+//
+// final Properties p = new Properties(super.getProperties());
+//
+// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
+// .toString());
+//
+// return p;
+//
+// }
- final Properties p = new Properties(super.getProperties());
+// Journal jnl = null;
- p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
- .toString());
-
- return p;
-
- }
-
- Journal jnl = null;
-
List<IBindingSet> data = null;
public void setUp() throws Exception {
- jnl = new Journal(getProperties());
+// jnl = new Journal(getProperties());
setUpData();
@@ -147,10 +145,10 @@
public void tearDown() throws Exception {
- if (jnl != null) {
- jnl.destroy();
- jnl = null;
- }
+// if (jnl != null) {
+// jnl.destroy();
+// jnl = null;
+// }
// clear reference.
data = null;
@@ -212,16 +210,18 @@
final IBlockingBuffer<IBindingSet[]> sink2 = query.newBuffer();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(
+ null/* fed */, null/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, sink2);
// get task.
final FutureTask<Void> ft = query.eval(context);
// execute task.
- jnl.getExecutorService().execute(ft);
+// jnl.getExecutorService().execute(ft);
+ ft.run();
TestQueryEngine.assertSameSolutions(expected, sink.iterator());
TestQueryEngine.assertSameSolutions(expected2, sink2.iterator());
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -0,0 +1,75 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+/*
+ * Created on Sep 5, 2010
+ */
+
+package com.bigdata.bop.engine;
+
+import com.bigdata.btree.ILocalBTreeView;
+import com.bigdata.journal.IIndexManager;
+import com.bigdata.service.IBigdataFederation;
+
+/**
+ * Interface exposing a limited set of the state of an executing query.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public interface IRunningQuery {
+
+ /**
+ * The {@link IBigdataFederation} IFF the operator is being evaluated on an
+ * {@link IBigdataFederation}. When evaluating operations against an
+ * {@link IBigdataFederation}, this reference provides access to the
+ * scale-out view of the indices and to other bigdata services.
+ */
+ IBigdataFederation<?> getFederation();
+
+ /**
+ * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs
+ * against the local indices. In scale-out, query evaluation proceeds shard
+ * wise and this {@link IIndexManager} MUST be able to read on the
+ * {@link ILocalBTreeView}.
+ */
+ IIndexManager getIndexManager();
+
+ /**
+ * The timestamp or transaction identifier against which the query is
+ * reading.
+ */
+ long getReadTimestamp();
+
+ /**
+ * The timestamp or transaction identifier against which the query is
+ * writing.
+ */
+ long getWriteTimestamp();
+
+ /**
+ * Terminate query evaluation
+ */
+ void halt();
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -0,0 +1,96 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 5, 2010
+ */
+
+package com.bigdata.bop.engine;
+
+import org.apache.log4j.Logger;
+
+import com.bigdata.journal.IIndexManager;
+import com.bigdata.service.IBigdataFederation;
+
+/**
+ * Mock object.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class MockRunningQuery implements IRunningQuery {
+
+ private static final Logger log = Logger.getLogger(MockRunningQuery.class);
+
+ private final IBigdataFederation<?> fed;
+
+ private final IIndexManager indexManager;
+
+ private final long readTimestamp;
+
+ private final long writeTimestamp;
+
+ /**
+ * Note: This constructor DOES NOT check its arguments so unit tests may be
+ * written with the minimum dependencies
+ *
+ * @param fed
+ * @param indexManager
+ * @param readTimestamp
+ * @param writeTimestamp
+ */
+ public MockRunningQuery(final IBigdataFederation<?> fed,
+ final IIndexManager indexManager, final long readTimestamp,
+ final long writeTimestamp) {
+
+ this.fed = fed;
+ this.indexManager = indexManager;
+ this.readTimestamp = readTimestamp;
+ this.writeTimestamp = writeTimestamp;
+
+ }
+
+ public IBigdataFederation<?> getFederation() {
+ return fed;
+ }
+
+ public IIndexManager getIndexManager() {
+ return indexManager;
+ }
+
+ public long getReadTimestamp() {
+ return readTimestamp;
+ }
+
+ public long getWriteTimestamp() {
+ return writeTimestamp;
+ }
+
+ /**
+ * NOP (you have to test things like slices with a full integration).
+ */
+ public void halt() {
+ log.warn("Mock object does not implement halt()");
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -41,10 +41,12 @@
import org.apache.log4j.Logger;
+import com.bigdata.bop.BOp;
import com.bigdata.bop.BindingSetPipelineOp;
-import com.bigdata.bop.BOp;
import com.bigdata.bop.IBindingSet;
import com.bigdata.journal.IIndexManager;
+import com.bigdata.journal.ITx;
+import com.bigdata.journal.TimestampUtility;
import com.bigdata.relation.accesspath.IBlockingBuffer;
import com.bigdata.service.IBigdataFederation;
@@ -580,6 +582,13 @@
* evaluating the query.
*
* @throws Exception
+ * @throws IllegalArgumentException
+ * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED}
+ * (queries may not read on the unisolated indices).
+ * @throws IllegalArgumentException
+ * if the <i>writeTimestamp</i> is neither
+ * {@link ITx#UNISOLATED} nor a read-write transaction
+ * identifier.
*
* @todo Consider elevating the read/write timestamps into the query plan as
* annotations. Closure would then rewrite the query plan for each
@@ -597,6 +606,10 @@
if (query == null)
throw new IllegalArgumentException();
+ if (readTimestamp == ITx.UNISOLATED)
+ throw new IllegalArgumentException();
+ if (TimestampUtility.isReadOnly(writeTimestamp))
+ throw new IllegalArgumentException();
final long timeout = query.getProperty(BOp.Annotations.TIMEOUT,
BOp.Annotations.DEFAULT_TIMEOUT);
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -0,0 +1,93 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 5, 2010
+ */
+
+package com.bigdata.bop.engine;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.rdf.sail.BigdataSail;
+import com.bigdata.relation.rule.IProgram;
+import com.bigdata.relation.rule.IRule;
+import com.bigdata.relation.rule.IStep;
+import com.bigdata.relation.rule.Program;
+import com.bigdata.relation.rule.Rule;
+
+/**
+ * Utility class converts {@link IRule}s to {@link BOp}s.
+ * <p>
+ * Note: This is a stopgap measure designed to allow us to evaluate SPARQL
+ * queries and verify the standalone {@link QueryEngine} while we develop a
+ * direct translation from Sesame's SPARQL operator tree onto {@link BOp}s and
+ * work on the scale-out query buffer transfer mechanisms.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class Rule2BOpUtility {
+
+ /**
+ * Convert an {@link IStep} into an operator tree. This should handle
+ * {@link IRule}s and {@link IProgram}s as they are currently implemented
+ * and used by the {@link BigdataSail}.
+ *
+ * @param step
+ * The step.
+ *
+ * @return
+ */
+ public static BOp convert(final IStep step) {
+
+ throw new UnsupportedOperationException();
+
+ }
+
+ /**
+ * Convert a rule into an operator tree.
+ *
+ * @param rule
+ *
+ * @return
+ */
+ public static BOp convert(final Rule rule) {
+
+ throw new UnsupportedOperationException();
+
+ }
+
+ /**
+ * Convert a program into an operator tree.
+ *
+ * @param program
+ *
+ * @return
+ */
+ public static BOp convert(final Program program) {
+
+ throw new UnsupportedOperationException();
+
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -59,6 +59,8 @@
import com.bigdata.bop.NoSuchBOpException;
import com.bigdata.bop.ap.Predicate;
import com.bigdata.bop.bset.Union;
+import com.bigdata.bop.solutions.SliceOp;
+import com.bigdata.journal.IIndexManager;
import com.bigdata.rdf.internal.IV;
import com.bigdata.rdf.spo.SPORelation;
import com.bigdata.relation.IMutableRelation;
@@ -69,8 +71,9 @@
import com.bigdata.relation.accesspath.IElementFilter;
import com.bigdata.relation.rule.IRule;
import com.bigdata.relation.rule.Program;
-import com.bigdata.relation.rule.eval.pipeline.DistributedJoinMasterTask;
+import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask;
import com.bigdata.resources.ResourceManager;
+import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.ndx.IAsynchronousWriteBufferFactory;
import com.bigdata.striterator.ChunkedArrayIterator;
import com.bigdata.striterator.IChunkedOrderedIterator;
@@ -82,7 +85,7 @@
*
* @todo HA aspects of running queries? Checkpoints for long running queries?
*/
-public class RunningQuery implements Future<Map<Integer,BOpStats>> {
+public class RunningQuery implements Future<Map<Integer,BOpStats>>, IRunningQuery {
private final static transient Logger log = Logger
.getLogger(RunningQuery.class);
@@ -887,10 +890,8 @@
final IBlockingBuffer<IBindingSet[]> altSink = altSinkId == null ? null
: op.newBuffer();
// context
- final BOpContext context = new BOpContext(queryEngine.getFederation(),
- queryEngine.getLocalIndexManager(), readTimestamp,
- writeTimestamp, chunk.partitionId, op.newStats(), chunk.source,
- sink, altSink);
+ final BOpContext context = new BOpContext(this, chunk.partitionId, op
+ .newStats(), chunk.source, sink, altSink);
// FutureTask for operator execution (not running yet).
final FutureTask<Void> f = op.eval(context);
// Hook the FutureTask.
@@ -976,9 +977,21 @@
* various methods in order to clean up the state of a completed query.
*/
+ public void halt() {
+
+ cancel(true/* mayInterruptIfRunning */);
+
+ }
+
/**
- * @todo Cancelled queries must reject or drop new chunks, etc. Queries must
- * release all of their resources when they are done().
+ * @todo Cancelled queries must reject or drop new chunks, etc.
+ * <p>
+ * Queries must release all of their resources when they are done().
+ * <p>
+ * Queries MUST NOT cause the solutions to be discarded before the
+ * client can consume them. This means that we have to carefully
+ * integrate/product {@link SliceOp} or just wrap the query buffer to
+ * impose the slice (simpler).
*/
final public boolean cancel(final boolean mayInterruptIfRunning) {
// halt the query.
@@ -1028,4 +1041,20 @@
}
+ public IBigdataFederation<?> getFederation() {
+ return queryEngine.getFederation();
+ }
+
+ public IIndexManager getIndexManager() {
+ return queryEngine.getLocalIndexManager();
+ }
+
+ public long getReadTimestamp() {
+ return readTimestamp;
+ }
+
+ public long getWriteTimestamp() {
+ return writeTimestamp;
+ }
+
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -33,10 +33,10 @@
import junit.framework.TestCase2;
-import com.bigdata.bop.BindingSetPipelineOp;
import com.bigdata.bop.ArrayBindingSet;
import com.bigdata.bop.BOp;
import com.bigdata.bop.BOpContext;
+import com.bigdata.bop.BindingSetPipelineOp;
import com.bigdata.bop.Constant;
import com.bigdata.bop.HashBindingSet;
import com.bigdata.bop.IBindingSet;
@@ -59,6 +59,7 @@
import com.bigdata.service.jini.JiniFederation;
import com.bigdata.striterator.ChunkedArrayIterator;
import com.bigdata.striterator.ICloseableIterator;
+import com.ibm.icu.impl.ByteBuffer;
/**
* Test suite for the {@link QueryEngine} against a local database instance.
@@ -389,6 +390,12 @@
* the {@link PipelineDelayOp} can be used to impose sufficient
* latency on the pipeline that the test can close the query buffer
* iterator first].
+ * <p>
+ * This must also be tested in scale-out to make sure that the data
+ * backing the solutions is not discarded before the caller can use
+ * those data. [This could be handled by materializing binding set
+ * objects out of a {@link ByteBuffer} rather than using a live decode
+ * of the data in that {@link ByteBuffer}.]
*/
public void test_query_closeIterator() {
@@ -397,6 +404,26 @@
}
/**
+ * @todo Test ability to impose a limit/offset slice on a query.
+ * <p>
+ * Note: While the logic for visiting only the solutions selected by
+ * the slice can be tested against a mock object, the integration by
+ * which a slice halts a query when it is satisfied has to be tested
+ * against a {@link QueryEngine}.
+ * <p>
+ * This must also be tested in scale-out to make sure that the data
+ * backing the solutions is not discarded before the caller can use
+ * those data. [This could be handled by materializing binding set
+ * objects out of a {@link ByteBuffer} rather than using a live decode
+ * of the data in that {@link ByteBuffer}.]
+ */
+ public void test_query_slice() {
+
+ fail("write test");
+
+ }
+
+ /**
* @todo Test the ability run a query reading on an access path using a
* element filter (other than DISTINCT).
*/
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -51,6 +51,7 @@
import com.bigdata.bop.ap.R;
import com.bigdata.bop.bset.CopyBindingSetOp;
import com.bigdata.bop.constraint.INBinarySearch;
+import com.bigdata.bop.engine.MockRunningQuery;
import com.bigdata.bop.engine.TestQueryEngine;
import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats;
import com.bigdata.journal.BufferMode;
@@ -215,9 +216,9 @@
final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, jnl/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, null/* sink2 */);
// get task.
@@ -312,9 +313,9 @@
final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, jnl/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, null/* sink2 */);
// get task.
@@ -435,9 +436,9 @@
final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, jnl/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, null/* sink2 */);
// get task.
@@ -553,9 +554,9 @@
final PipelineJoinStats stats = query.newStats();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, jnl/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, null/* sink2 */);
// get task.
@@ -675,9 +676,9 @@
final PipelineJoinStats stats = query.newStats();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, jnl/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, sink2);
// get task.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@@ -46,11 +45,9 @@
import com.bigdata.bop.NV;
import com.bigdata.bop.Var;
import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.MockRunningQuery;
import com.bigdata.bop.engine.TestQueryEngine;
-import com.bigdata.bop.solutions.DistinctBindingSetOp;
-import com.bigdata.journal.BufferMode;
import com.bigdata.journal.ITx;
-import com.bigdata.journal.Journal;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBlockingBuffer;
import com.bigdata.relation.accesspath.ThickAsynchronousIterator;
@@ -78,25 +75,25 @@
super(name);
}
- @Override
- public Properties getProperties() {
+// @Override
+// public Properties getProperties() {
+//
+// final Properties p = new Properties(super.getProperties());
+//
+// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
+// .toString());
+//
+// return p;
+//
+// }
- final Properties p = new Properties(super.getProperties());
+// Journal jnl = null;
- p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
- .toString());
-
- return p;
-
- }
-
- Journal jnl = null;
-
List<IBindingSet> data = null;
public void setUp() throws Exception {
- jnl = new Journal(getProperties());
+// jnl = new Journal(getProperties());
setUpData();
@@ -153,11 +150,11 @@
public void tearDown() throws Exception {
- if (jnl != null) {
- jnl.destroy();
- jnl = null;
- }
-
+// if (jnl != null) {
+// jnl.destroy();
+// jnl = null;
+// }
+//
// clear reference.
data = null;
@@ -208,16 +205,17 @@
final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, null/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, null/* sink2 */);
// get task.
final FutureTask<Void> ft = query.eval(context);
// execute task.
- jnl.getExecutorService().execute(ft);
+// jnl.getExecutorService().execute(ft);
+ ft.run();
TestQueryEngine.assertSameSolutions(expected, sink.iterator());
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-05 18:16:01 UTC (rev 3509)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-05 20:17:44 UTC (rev 3510)
@@ -28,7 +28,6 @@
package com.bigdata.bop.solutions;
import java.util.ArrayList;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@@ -45,11 +44,9 @@
import com.bigdata.bop.NV;
import com.bigdata.bop.Var;
import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.MockRunningQuery;
import com.bigdata.bop.engine.TestQueryEngine;
-import com.bigdata.bop.solutions.SliceOp;
-import com.bigdata.journal.BufferMode;
import com.bigdata.journal.ITx;
-import com.bigdata.journal.Journal;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.IBlockingBuffer;
import com.bigdata.relation.accesspath.ThickAsynchronousIterator;
@@ -75,25 +72,25 @@
super(name);
}
- @Override
- public Properties getProperties() {
+// @Override
+// public Properties getProperties() {
+//
+// final Properties p = new Properties(super.getProperties());
+//
+// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
+// .toString());
+//
+// return p;
+//
+// }
+//
+// Journal jnl = null;
- final Properties p = new Properties(super.getProperties());
-
- p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
- .toString());
-
- return p;
-
- }
-
- Journal jnl = null;
-
ArrayList<IBindingSet> data;
public void setUp() throws Exception {
- jnl = new Journal(getProperties());
+// jnl = new Journal(getProperties());
setUpData();
@@ -150,10 +147,10 @@
public void tearDown() throws Exception {
- if (jnl != null) {
- jnl.destroy();
- jnl = null;
- }
+// if (jnl != null) {
+// jnl.destroy();
+// jnl = null;
+// }
// clear reference.
data = null;
@@ -208,16 +205,17 @@
final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer();
final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>(
- null/* fed */, jnl/* indexManager */,
+ new MockRunningQuery(null/* fed */, null/* indexManager */,
ITx.READ_COMMITTED/* readTimestamp */,
- ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats,
+ ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats,
source, sink, null/* sink2 */);
// get task.
final FutureTask<Void> ft = query.eval(context);
// execute task.
- jnl.getExecutorService().execute(ft);
+// jnl.getExecutorService().execute(ft);
+ ft.run();
TestQueryEngine.assertSameSolutions(expected, sink.iterator());
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|