From: <tho...@us...> - 2010-09-05 20:17:51
|
Revision: 3510 http://bigdata.svn.sourceforge.net/bigdata/?rev=3510&view=rev Author: thompsonbry Date: 2010-09-05 20:17:44 +0000 (Sun, 05 Sep 2010) Log Message: ----------- Introduced an interface for running queries and refactored the BOpContext to use that interface. This makes it possible to test more things with mock objects and also provides a hook from the BOpContext back to the RunningQuery. That hook is necessary when we need to halt a running query (for an offset/limit slice). Added Rule2BOpUtility which will handle the initial conversion from a Rule or Program into a bop tree. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -27,20 +27,18 @@ */ package com.bigdata.bop; -import java.util.Iterator; - import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.engine.RunningQuery; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.btree.IIndex; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.btree.IRangeQuery; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.ITx; import com.bigdata.journal.TimestampUtility; -import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.IAccessPath; @@ -49,11 +47,10 @@ import com.bigdata.relation.locator.IResourceLocator; import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.eval.IJoinNexus; -import com.bigdata.service.AbstractScaleOutFederation; import com.bigdata.service.DataService; import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.ndx.IClientIndex; import com.bigdata.striterator.IKeyOrder; +import com.ibm.icu.impl.ByteBuffer; /** * The evaluation context for the operator (NOT serializable). @@ -65,14 +62,16 @@ static private final Logger log = Logger.getLogger(BOpContext.class); - private final IBigdataFederation<?> fed; + private final IRunningQuery runningQuery; + +// private final IBigdataFederation<?> fed; +// +// private final IIndexManager indexManager; +// +// private final long readTimestamp; +// +// private final long writeTimestamp; - private final IIndexManager indexManager; - - private final long readTimestamp; - - private final long writeTimestamp; - private final int partitionId; private final BOpStats stats; @@ -84,13 +83,25 @@ private final IBlockingBuffer<E[]> sink2; /** + * The interface for a running query. + * <p> + * Note: In scale-out each node will have a distinct {@link IRunningQuery} + * object and the query controller will have access to additional state, + * such as the aggregation of the {@link BOpStats} for the query on all + * nodes. + */ + public IRunningQuery getRunningQuery() { + return runningQuery; + } + + /** * The {@link IBigdataFederation} IFF the operator is being evaluated on an * {@link IBigdataFederation}. When evaluating operations against an * {@link IBigdataFederation}, this reference provides access to the * scale-out view of the indices and to other bigdata services. */ public IBigdataFederation<?> getFederation() { - return fed; + return runningQuery.getFederation(); } /** @@ -100,7 +111,7 @@ * {@link ILocalBTreeView}. */ public final IIndexManager getIndexManager() { - return indexManager; + return runningQuery.getIndexManager(); } /** @@ -108,7 +119,7 @@ * reading. */ public final long getReadTimestamp() { - return readTimestamp; + return runningQuery.getReadTimestamp(); } /** @@ -116,7 +127,7 @@ * writing. */ public final long getWriteTimestamp() { - return writeTimestamp; + return runningQuery.getWriteTimestamp(); } /** @@ -137,6 +148,22 @@ /** * Where to read the data to be consumed by the operator. + * + * @todo Since joins now run from locally materialized data in all cases the + * API could be simplified somewhat given that we know that there will + * be a single "source" chunk of binding sets. Also, the reason for + * the {@link IAsynchronousIterator} here is that a downstream join + * could error (or satisfy a slice) and halt the upstream joins. That + * is being coordinated through the {@link RunningQuery} now. + * <p> + * It is not yet clear what the right API is for the source. The + * iterator model might be just fine, but might not need to be + * asynchronous and does not need to be closeable. + * <p> + * Perhaps the right thing is to expose an object with a richer API + * for obtaining various kinds of iterators or even access to the + * direct {@link ByteBuffer}s backing the data (for high volume joins, + * exernal merge sorts, etc). */ public final IAsynchronousIterator<E[]> getSource() { return source; @@ -194,18 +221,6 @@ * failed joins outside of the join group. * * @throws IllegalArgumentException - * if the <i>indexManager</i> is <code>null</code> - * @throws IllegalArgumentException - * if the <i>indexManager</i> is is not a <em>local</em> index - * manager. - * @throws IllegalArgumentException - * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} - * (queries may not read on the unisolated indices). - * @throws IllegalArgumentException - * if the <i>writeTimestamp</i> is neither - * {@link ITx#UNISOLATED} nor a read-write transaction - * identifier. - * @throws IllegalArgumentException * if the <i>stats</i> is <code>null</code> * @throws IllegalArgumentException * if the <i>source</i> is <code>null</code> (use an empty @@ -213,37 +228,47 @@ * @throws IllegalArgumentException * if the <i>sink</i> is <code>null</code> */ - public BOpContext(final IBigdataFederation<?> fed, - final IIndexManager indexManager, final long readTimestamp, - final long writeTimestamp, final int partitionId, +// * @throws IllegalArgumentException +// * if the <i>indexManager</i> is <code>null</code> +// * @throws IllegalArgumentException +// * if the <i>indexManager</i> is is not a <em>local</em> index +// * manager. +// * @throws IllegalArgumentException +// * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} +// * (queries may not read on the unisolated indices). +// * @throws IllegalArgumentException +// * if the <i>writeTimestamp</i> is neither +// * {@link ITx#UNISOLATED} nor a read-write transaction +// * identifier. + public BOpContext(final IRunningQuery runningQuery,final int partitionId, final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { - if (indexManager == null) - throw new IllegalArgumentException(); - if (indexManager instanceof IBigdataFederation<?>) { - /* - * This is disallowed because the predicate specifies an index - * partition and expects to have access to the local index objects - * for that index partition. - */ - throw new IllegalArgumentException( - "Expecting a local index manager, not: " - + indexManager.getClass().toString()); - } - if (readTimestamp == ITx.UNISOLATED) - throw new IllegalArgumentException(); - if (TimestampUtility.isReadOnly(writeTimestamp)) - throw new IllegalArgumentException(); + this.runningQuery = runningQuery; +// if (indexManager == null) +// throw new IllegalArgumentException(); +// if (indexManager instanceof IBigdataFederation<?>) { +// /* +// * This is disallowed because predicates always read on local index +// * objects, even in scale-out. +// */ +// throw new IllegalArgumentException( +// "Expecting a local index manager, not: " +// + indexManager.getClass().toString()); +// } +// if (readTimestamp == ITx.UNISOLATED) +// throw new IllegalArgumentException(); +// if (TimestampUtility.isReadOnly(writeTimestamp)) +// throw new IllegalArgumentException(); if (stats == null) throw new IllegalArgumentException(); if (source == null) throw new IllegalArgumentException(); if (sink == null) throw new IllegalArgumentException(); - this.fed = fed; // may be null - this.indexManager = indexManager; - this.readTimestamp = readTimestamp; - this.writeTimestamp = writeTimestamp; +// this.fed = fed; // may be null +// this.indexManager = indexManager; +// this.readTimestamp = readTimestamp; +// this.writeTimestamp = writeTimestamp; this.partitionId = partitionId; this.stats = stats; this.source = source; @@ -344,24 +369,28 @@ if (partitionId == -1) { - if(indexManager instanceof IBigdataFederation<?>) + if (getFederation() != null) { + // This is scale-out so the partition identifier is required. throw new UnsupportedOperationException(); + } // The index is not partitioned. - ndx = (ILocalBTreeView) indexManager.getIndex(namespace + "." + ndx = (ILocalBTreeView) getIndexManager().getIndex(namespace + "." + keyOrder.getIndexName(), getWriteTimestamp()); } else { - if(!(indexManager instanceof IBigdataFederation<?>)) + if (getFederation() == null) { + // This is not scale-out so index partitions are not supported. throw new UnsupportedOperationException(); + } // The name of the desired index partition. final String name = DataService.getIndexPartitionName(namespace + "." + keyOrder.getIndexName(), partitionId); // MUST be a local index view. - ndx = (ILocalBTreeView) indexManager.getIndex(name, + ndx = (ILocalBTreeView) getIndexManager().getIndex(name, getWriteTimestamp()); } @@ -427,6 +456,10 @@ PipelineOp.Annotations.FULLY_BUFFERED_READ_THRESHOLD, PipelineOp.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); + final IIndexManager indexManager = getIndexManager(); + + final long readTimestamp = getReadTimestamp(); + if (predicate.getPartitionId() != -1) { /* @@ -669,6 +702,8 @@ * with that thrown cause. */ public void halt() { + + runningQuery.halt(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -101,9 +101,6 @@ */ private static final long serialVersionUID = 1L; - /** - * @todo Declare SLICE annotation and support SLICE in the {@link JoinTask}. - */ public interface Annotations extends BindingSetPipelineOp.Annotations { /** @@ -478,11 +475,6 @@ this.optional = joinOp.isOptional(); this.variablesToKeep = joinOp.variablesToKeep(); this.context = context; - /* - * FIXME Carefully review which index manager (local versus fed) is - * being used to resolve the relation. Also note that we used to - * cache the resourceLocator. - */ this.relation = context.getReadRelation(right); this.source = context.getSource(); this.sink = context.getSink(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; @@ -47,11 +46,10 @@ import com.bigdata.bop.Var; import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.MockRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; import com.bigdata.bop.solutions.DistinctBindingSetOp; -import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; @@ -79,25 +77,25 @@ super(name); } - @Override - public Properties getProperties() { +// @Override +// public Properties getProperties() { +// +// final Properties p = new Properties(super.getProperties()); +// +// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient +// .toString()); +// +// return p; +// +// } - final Properties p = new Properties(super.getProperties()); +// Journal jnl = null; - p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient - .toString()); - - return p; - - } - - Journal jnl = null; - List<IBindingSet> data = null; public void setUp() throws Exception { - jnl = new Journal(getProperties()); +// jnl = new Journal(getProperties()); setUpData(); @@ -147,10 +145,10 @@ public void tearDown() throws Exception { - if (jnl != null) { - jnl.destroy(); - jnl = null; - } +// if (jnl != null) { +// jnl.destroy(); +// jnl = null; +// } // clear reference. data = null; @@ -212,16 +210,18 @@ final IBlockingBuffer<IBindingSet[]> sink2 = query.newBuffer(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery( + null/* fed */, null/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, sink2); // get task. final FutureTask<Void> ft = query.eval(context); // execute task. - jnl.getExecutorService().execute(ft); +// jnl.getExecutorService().execute(ft); + ft.run(); TestQueryEngine.assertSameSolutions(expected, sink.iterator()); TestQueryEngine.assertSameSolutions(expected2, sink2.iterator()); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -0,0 +1,75 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +/* + * Created on Sep 5, 2010 + */ + +package com.bigdata.bop.engine; + +import com.bigdata.btree.ILocalBTreeView; +import com.bigdata.journal.IIndexManager; +import com.bigdata.service.IBigdataFederation; + +/** + * Interface exposing a limited set of the state of an executing query. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IRunningQuery { + + /** + * The {@link IBigdataFederation} IFF the operator is being evaluated on an + * {@link IBigdataFederation}. When evaluating operations against an + * {@link IBigdataFederation}, this reference provides access to the + * scale-out view of the indices and to other bigdata services. + */ + IBigdataFederation<?> getFederation(); + + /** + * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs + * against the local indices. In scale-out, query evaluation proceeds shard + * wise and this {@link IIndexManager} MUST be able to read on the + * {@link ILocalBTreeView}. + */ + IIndexManager getIndexManager(); + + /** + * The timestamp or transaction identifier against which the query is + * reading. + */ + long getReadTimestamp(); + + /** + * The timestamp or transaction identifier against which the query is + * writing. + */ + long getWriteTimestamp(); + + /** + * Terminate query evaluation + */ + void halt(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -0,0 +1,96 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 5, 2010 + */ + +package com.bigdata.bop.engine; + +import org.apache.log4j.Logger; + +import com.bigdata.journal.IIndexManager; +import com.bigdata.service.IBigdataFederation; + +/** + * Mock object. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class MockRunningQuery implements IRunningQuery { + + private static final Logger log = Logger.getLogger(MockRunningQuery.class); + + private final IBigdataFederation<?> fed; + + private final IIndexManager indexManager; + + private final long readTimestamp; + + private final long writeTimestamp; + + /** + * Note: This constructor DOES NOT check its arguments so unit tests may be + * written with the minimum dependencies + * + * @param fed + * @param indexManager + * @param readTimestamp + * @param writeTimestamp + */ + public MockRunningQuery(final IBigdataFederation<?> fed, + final IIndexManager indexManager, final long readTimestamp, + final long writeTimestamp) { + + this.fed = fed; + this.indexManager = indexManager; + this.readTimestamp = readTimestamp; + this.writeTimestamp = writeTimestamp; + + } + + public IBigdataFederation<?> getFederation() { + return fed; + } + + public IIndexManager getIndexManager() { + return indexManager; + } + + public long getReadTimestamp() { + return readTimestamp; + } + + public long getWriteTimestamp() { + return writeTimestamp; + } + + /** + * NOP (you have to test things like slices with a full integration). + */ + public void halt() { + log.warn("Mock object does not implement halt()"); + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -41,10 +41,12 @@ import org.apache.log4j.Logger; +import com.bigdata.bop.BOp; import com.bigdata.bop.BindingSetPipelineOp; -import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITx; +import com.bigdata.journal.TimestampUtility; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.service.IBigdataFederation; @@ -580,6 +582,13 @@ * evaluating the query. * * @throws Exception + * @throws IllegalArgumentException + * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} + * (queries may not read on the unisolated indices). + * @throws IllegalArgumentException + * if the <i>writeTimestamp</i> is neither + * {@link ITx#UNISOLATED} nor a read-write transaction + * identifier. * * @todo Consider elevating the read/write timestamps into the query plan as * annotations. Closure would then rewrite the query plan for each @@ -597,6 +606,10 @@ if (query == null) throw new IllegalArgumentException(); + if (readTimestamp == ITx.UNISOLATED) + throw new IllegalArgumentException(); + if (TimestampUtility.isReadOnly(writeTimestamp)) + throw new IllegalArgumentException(); final long timeout = query.getProperty(BOp.Annotations.TIMEOUT, BOp.Annotations.DEFAULT_TIMEOUT); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -0,0 +1,93 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 5, 2010 + */ + +package com.bigdata.bop.engine; + +import com.bigdata.bop.BOp; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.relation.rule.IProgram; +import com.bigdata.relation.rule.IRule; +import com.bigdata.relation.rule.IStep; +import com.bigdata.relation.rule.Program; +import com.bigdata.relation.rule.Rule; + +/** + * Utility class converts {@link IRule}s to {@link BOp}s. + * <p> + * Note: This is a stopgap measure designed to allow us to evaluate SPARQL + * queries and verify the standalone {@link QueryEngine} while we develop a + * direct translation from Sesame's SPARQL operator tree onto {@link BOp}s and + * work on the scale-out query buffer transfer mechanisms. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class Rule2BOpUtility { + + /** + * Convert an {@link IStep} into an operator tree. This should handle + * {@link IRule}s and {@link IProgram}s as they are currently implemented + * and used by the {@link BigdataSail}. + * + * @param step + * The step. + * + * @return + */ + public static BOp convert(final IStep step) { + + throw new UnsupportedOperationException(); + + } + + /** + * Convert a rule into an operator tree. + * + * @param rule + * + * @return + */ + public static BOp convert(final Rule rule) { + + throw new UnsupportedOperationException(); + + } + + /** + * Convert a program into an operator tree. + * + * @param program + * + * @return + */ + public static BOp convert(final Program program) { + + throw new UnsupportedOperationException(); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -59,6 +59,8 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.bset.Union; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.journal.IIndexManager; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.spo.SPORelation; import com.bigdata.relation.IMutableRelation; @@ -69,8 +71,9 @@ import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.Program; -import com.bigdata.relation.rule.eval.pipeline.DistributedJoinMasterTask; +import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask; import com.bigdata.resources.ResourceManager; +import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ndx.IAsynchronousWriteBufferFactory; import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.IChunkedOrderedIterator; @@ -82,7 +85,7 @@ * * @todo HA aspects of running queries? Checkpoints for long running queries? */ -public class RunningQuery implements Future<Map<Integer,BOpStats>> { +public class RunningQuery implements Future<Map<Integer,BOpStats>>, IRunningQuery { private final static transient Logger log = Logger .getLogger(RunningQuery.class); @@ -887,10 +890,8 @@ final IBlockingBuffer<IBindingSet[]> altSink = altSinkId == null ? null : op.newBuffer(); // context - final BOpContext context = new BOpContext(queryEngine.getFederation(), - queryEngine.getLocalIndexManager(), readTimestamp, - writeTimestamp, chunk.partitionId, op.newStats(), chunk.source, - sink, altSink); + final BOpContext context = new BOpContext(this, chunk.partitionId, op + .newStats(), chunk.source, sink, altSink); // FutureTask for operator execution (not running yet). final FutureTask<Void> f = op.eval(context); // Hook the FutureTask. @@ -976,9 +977,21 @@ * various methods in order to clean up the state of a completed query. */ + public void halt() { + + cancel(true/* mayInterruptIfRunning */); + + } + /** - * @todo Cancelled queries must reject or drop new chunks, etc. Queries must - * release all of their resources when they are done(). + * @todo Cancelled queries must reject or drop new chunks, etc. + * <p> + * Queries must release all of their resources when they are done(). + * <p> + * Queries MUST NOT cause the solutions to be discarded before the + * client can consume them. This means that we have to carefully + * integrate/product {@link SliceOp} or just wrap the query buffer to + * impose the slice (simpler). */ final public boolean cancel(final boolean mayInterruptIfRunning) { // halt the query. @@ -1028,4 +1041,20 @@ } + public IBigdataFederation<?> getFederation() { + return queryEngine.getFederation(); + } + + public IIndexManager getIndexManager() { + return queryEngine.getLocalIndexManager(); + } + + public long getReadTimestamp() { + return readTimestamp; + } + + public long getWriteTimestamp() { + return writeTimestamp; + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -33,10 +33,10 @@ import junit.framework.TestCase2; -import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.Constant; import com.bigdata.bop.HashBindingSet; import com.bigdata.bop.IBindingSet; @@ -59,6 +59,7 @@ import com.bigdata.service.jini.JiniFederation; import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.ICloseableIterator; +import com.ibm.icu.impl.ByteBuffer; /** * Test suite for the {@link QueryEngine} against a local database instance. @@ -389,6 +390,12 @@ * the {@link PipelineDelayOp} can be used to impose sufficient * latency on the pipeline that the test can close the query buffer * iterator first]. + * <p> + * This must also be tested in scale-out to make sure that the data + * backing the solutions is not discarded before the caller can use + * those data. [This could be handled by materializing binding set + * objects out of a {@link ByteBuffer} rather than using a live decode + * of the data in that {@link ByteBuffer}.] */ public void test_query_closeIterator() { @@ -397,6 +404,26 @@ } /** + * @todo Test ability to impose a limit/offset slice on a query. + * <p> + * Note: While the logic for visiting only the solutions selected by + * the slice can be tested against a mock object, the integration by + * which a slice halts a query when it is satisfied has to be tested + * against a {@link QueryEngine}. + * <p> + * This must also be tested in scale-out to make sure that the data + * backing the solutions is not discarded before the caller can use + * those data. [This could be handled by materializing binding set + * objects out of a {@link ByteBuffer} rather than using a live decode + * of the data in that {@link ByteBuffer}.] + */ + public void test_query_slice() { + + fail("write test"); + + } + + /** * @todo Test the ability run a query reading on an access path using a * element filter (other than DISTINCT). */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -51,6 +51,7 @@ import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.CopyBindingSetOp; import com.bigdata.bop.constraint.INBinarySearch; +import com.bigdata.bop.engine.MockRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; import com.bigdata.journal.BufferMode; @@ -215,9 +216,9 @@ final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, jnl/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. @@ -312,9 +313,9 @@ final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, jnl/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. @@ -435,9 +436,9 @@ final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, jnl/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. @@ -553,9 +554,9 @@ final PipelineJoinStats stats = query.newStats(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, jnl/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. @@ -675,9 +676,9 @@ final PipelineJoinStats stats = query.newStats(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, jnl/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, sink2); // get task. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; @@ -46,11 +45,9 @@ import com.bigdata.bop.NV; import com.bigdata.bop.Var; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.MockRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; -import com.bigdata.bop.solutions.DistinctBindingSetOp; -import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; @@ -78,25 +75,25 @@ super(name); } - @Override - public Properties getProperties() { +// @Override +// public Properties getProperties() { +// +// final Properties p = new Properties(super.getProperties()); +// +// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient +// .toString()); +// +// return p; +// +// } - final Properties p = new Properties(super.getProperties()); +// Journal jnl = null; - p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient - .toString()); - - return p; - - } - - Journal jnl = null; - List<IBindingSet> data = null; public void setUp() throws Exception { - jnl = new Journal(getProperties()); +// jnl = new Journal(getProperties()); setUpData(); @@ -153,11 +150,11 @@ public void tearDown() throws Exception { - if (jnl != null) { - jnl.destroy(); - jnl = null; - } - +// if (jnl != null) { +// jnl.destroy(); +// jnl = null; +// } +// // clear reference. data = null; @@ -208,16 +205,17 @@ final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, null/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. final FutureTask<Void> ft = query.eval(context); // execute task. - jnl.getExecutorService().execute(ft); +// jnl.getExecutorService().execute(ft); + ft.run(); TestQueryEngine.assertSameSolutions(expected, sink.iterator()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-05 18:16:01 UTC (rev 3509) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-05 20:17:44 UTC (rev 3510) @@ -28,7 +28,6 @@ package com.bigdata.bop.solutions; import java.util.ArrayList; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; @@ -45,11 +44,9 @@ import com.bigdata.bop.NV; import com.bigdata.bop.Var; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.MockRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; -import com.bigdata.bop.solutions.SliceOp; -import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; @@ -75,25 +72,25 @@ super(name); } - @Override - public Properties getProperties() { +// @Override +// public Properties getProperties() { +// +// final Properties p = new Properties(super.getProperties()); +// +// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient +// .toString()); +// +// return p; +// +// } +// +// Journal jnl = null; - final Properties p = new Properties(super.getProperties()); - - p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient - .toString()); - - return p; - - } - - Journal jnl = null; - ArrayList<IBindingSet> data; public void setUp() throws Exception { - jnl = new Journal(getProperties()); +// jnl = new Journal(getProperties()); setUpData(); @@ -150,10 +147,10 @@ public void tearDown() throws Exception { - if (jnl != null) { - jnl.destroy(); - jnl = null; - } +// if (jnl != null) { +// jnl.destroy(); +// jnl = null; +// } // clear reference. data = null; @@ -208,16 +205,17 @@ final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(); final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - null/* fed */, jnl/* indexManager */, + new MockRunningQuery(null/* fed */, null/* indexManager */, ITx.READ_COMMITTED/* readTimestamp */, - ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + ITx.UNISOLATED/* writeTimestamp */), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. final FutureTask<Void> ft = query.eval(context); // execute task. - jnl.getExecutorService().execute(ft); +// jnl.getExecutorService().execute(ft); + ft.run(); TestQueryEngine.assertSameSolutions(expected, sink.iterator()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |