From: <tho...@us...> - 2010-10-07 19:05:24
|
Revision: 3745 http://bigdata.svn.sourceforge.net/bigdata/?rev=3745&view=rev Author: thompsonbry Date: 2010-10-07 19:05:15 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Finished UNION and integrated it into the SAIL. The new decision tree rewrites of the rules to bops is now enabled by default. Groups the UNION, STEPS, STAR (not yet implemented), and JoinGraph (not yet implemented) operators under a "controller" package. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestUnion.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestUnionBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/eval/ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -234,6 +234,8 @@ * {@link Annotations#TIMESTAMP} associated with that operation. * * @see #TIMESTAMP + * + * @todo Move to {@link IPredicate}? */ String MUTATION = BOp.class.getName() + ".mutation"; @@ -244,6 +246,8 @@ * reads or writes on the database (no default). * * @see #MUTATION + * + * @todo Move to {@link IPredicate}? */ String TIMESTAMP = BOp.class.getName() + ".timestamp"; @@ -256,6 +260,18 @@ BOpEvaluationContext DEFAULT_EVALUATION_CONTEXT = BOpEvaluationContext.ANY; /** + * A boolean annotation whose value indicates whether or not this is a + * control operator (default {@value #DEFAULT_CONTROLLER}). A control + * operator is an operator which will issue subqueries for its + * arguments. Thus control operators mark a boundary in pipelined + * evaluation. Some examples of control operators include UNION, STEPS, + * and STAR (aka transitive closure). + */ + String CONTROLLER = BOp.class.getName()+".controller"; + + boolean DEFAULT_CONTROLLER = false; + + /** * For hash partitioned operators, this is the set of the member nodes * for the operator. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -610,6 +610,8 @@ * long gone. The problem with specifying a hashCode() and equals() method * for BOp/BOpBase/Predicate is that we wind up with duplicate bop * exceptions being reported by BOpUtility#getIndex(BOp). + * + * Note: Both Var and Constant override hashCode() and equals(). */ // /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -116,6 +116,8 @@ /** * Where to write the output of the operator. + * + * @see PipelineOp.Annotations#SINK_REF */ public final IBlockingBuffer<E[]> getSink() { return sink; @@ -125,6 +127,8 @@ * Optional alternative sink for the output of the operator. This is used by * things like SPARQL optional joins to route failed joins outside of the * join group. + * + * @see PipelineOp.Annotations#ALT_SINK_REF */ public final IBlockingBuffer<E[]> getSink2() { return sink2; @@ -180,18 +184,6 @@ * When doing that, modify to automatically track the {@link BOpStats} * as the <i>source</i> is consumed. */ -// * @throws IllegalArgumentException -// * if the <i>indexManager</i> is <code>null</code> -// * @throws IllegalArgumentException -// * if the <i>indexManager</i> is is not a <em>local</em> index -// * manager. -// * @throws IllegalArgumentException -// * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} -// * (queries may not read on the unisolated indices). -// * @throws IllegalArgumentException -// * if the <i>writeTimestamp</i> is neither -// * {@link ITx#UNISOLATED} nor a read-write transaction -// * identifier. public BOpContext(final IRunningQuery runningQuery,final int partitionId, final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { @@ -199,31 +191,12 @@ super(runningQuery.getFederation(), runningQuery.getIndexManager()); this.runningQuery = runningQuery; -// if (indexManager == null) -// throw new IllegalArgumentException(); -// if (indexManager instanceof IBigdataFederation<?>) { -// /* -// * This is disallowed because predicates always read on local index -// * objects, even in scale-out. -// */ -// throw new IllegalArgumentException( -// "Expecting a local index manager, not: " -// + indexManager.getClass().toString()); -// } -// if (readTimestamp == ITx.UNISOLATED) -// throw new IllegalArgumentException(); -// if (TimestampUtility.isReadOnly(writeTimestamp)) -// throw new IllegalArgumentException(); if (stats == null) throw new IllegalArgumentException(); if (source == null) throw new IllegalArgumentException(); if (sink == null) throw new IllegalArgumentException(); -// this.fed = fed; // may be null -// this.indexManager = indexManager; -// this.readTimestamp = readTimestamp; -// this.writeTimestamp = writeTimestamp; this.partitionId = partitionId; this.stats = stats; this.source = source; @@ -266,7 +239,7 @@ if (constraints != null) { // verify constraint. - return isConsistent(constraints, bindings); + return BOpUtility.isConsistent(constraints, bindings); } @@ -339,49 +312,6 @@ } - /** - * Check constraints. - * - * @param constraints - * @param bindingSet - * - * @return <code>true</code> iff the constraints are satisfied. - */ - public boolean isConsistent(final IConstraint[] constraints, - final IBindingSet bindingSet) { - - for (int i = 0; i < constraints.length; i++) { - - final IConstraint constraint = constraints[i]; - - if (!constraint.accept(bindingSet)) { - - if (log.isDebugEnabled()) { - - log.debug("Rejected by " - + constraint.getClass().getSimpleName() + " : " - + bindingSet); - - } - - return false; - - } - - if (log.isTraceEnabled()) { - - log.debug("Accepted by " - + constraint.getClass().getSimpleName() + " : " - + bindingSet); - - } - - } - - return true; - - } - // /** // * Cancel the running query (normal termination). // * <p> @@ -455,4 +385,40 @@ // // } + /** + * Copy data from the source to the sink. The sink will be flushed and + * closed. The source will be closed. + */ + public void copySourceToSink() { + + // source. + final IAsynchronousIterator<IBindingSet[]> source = (IAsynchronousIterator) getSource(); + + // default sink + final IBlockingBuffer<IBindingSet[]> sink = (IBlockingBuffer) getSink(); + + final BOpStats stats = getStats(); + + try { + + // copy binding sets from the source. + BOpUtility.copy(source, sink, null/* sink2 */, + null/* constraints */, stats); + + // flush the sink. + sink.flush(); + + } finally { + + sink.close(); + + if (sink2 != null) + sink2.close(); + + source.close(); + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -27,6 +27,7 @@ package com.bigdata.bop; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -35,9 +36,13 @@ import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + import com.bigdata.bop.BOp.Annotations; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.AbstractNode; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; import cutthecrap.utils.striterators.Expander; import cutthecrap.utils.striterators.Filter; @@ -52,7 +57,7 @@ */ public class BOpUtility { -// private static final Logger log = Logger.getLogger(BOpUtility.class); + private static final Logger log = Logger.getLogger(BOpUtility.class); /** * Pre-order recursive visitation of the operator tree (arguments only, no @@ -468,9 +473,51 @@ } + /** + * Return the left-deep child of the operator, halting at a leaf or earlier + * if a control operator is found. + * + * @param op + * The operator. + * + * @return The child where pipeline evaluation should begin. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code>. + * + * @todo This does not protect against loops in the operator tree. + * + * @todo unit tests. + */ + static public BOp getPipelineStart(BOp op) { + if (op == null) + throw new IllegalArgumentException(); + + while (true) { + if (op.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)) { + // Halt at a control operator. + return op; + } + if(op.arity()==0) { + // No children. + return op; + } + final BOp left = op.get(0); + if (left == null) { + // Halt at a leaf. + return op; + } + // Descend through the left child. + op = left; + } + + } + /** - * Combine chunks drawn from an iterator into a single chunk. + * Combine chunks drawn from an iterator into a single chunk. This is useful + * when materializing intermediate results for an all-at-once operator. * * @param itr * The iterator @@ -511,20 +558,34 @@ } if (nchunks == 0) { + return new IBindingSet[0]; + } else if (nchunks == 1) { + return list.get(0); + } else { + int n = 0; + final IBindingSet[] a = new IBindingSet[nelements]; + final Iterator<IBindingSet[]> itr2 = list.iterator(); + while (itr2.hasNext()) { + final IBindingSet[] t = itr2.next(); + System.arraycopy(t/* src */, 0/* srcPos */, a/* dest */, n/* destPos */, t.length/* length */); + n += t.length; + } + return a; + } } // toArray() @@ -618,4 +679,160 @@ // // } + /** + * Check constraints. + * + * @param constraints + * @param bindingSet + * + * @return <code>true</code> iff the constraints are satisfied. + */ + static public boolean isConsistent(final IConstraint[] constraints, + final IBindingSet bindingSet) { + + for (int i = 0; i < constraints.length; i++) { + + final IConstraint constraint = constraints[i]; + + if (!constraint.accept(bindingSet)) { + + if (log.isDebugEnabled()) { + + log.debug("Rejected by " + + constraint.getClass().getSimpleName() + " : " + + bindingSet); + + } + + return false; + + } + + if (log.isTraceEnabled()) { + + log.debug("Accepted by " + + constraint.getClass().getSimpleName() + " : " + + bindingSet); + + } + + } + + return true; + + } + + /** + * Copy binding sets from the source to the sink(s). + * + * @param source + * The source. + * @param sink + * The sink (required). + * @param sink2 + * Another sink (optional). + * @param constraints + * Binding sets which fail these constraints will NOT be copied + * (optional). + * @param stats + * The {@link BOpStats#chunksIn} and {@link BOpStats#unitsIn} + * will be updated during the copy (optional). + */ + static public void copy( + final IAsynchronousIterator<IBindingSet[]> source, + final IBlockingBuffer<IBindingSet[]> sink, + final IBlockingBuffer<IBindingSet[]> sink2, + final IConstraint[] constraints, final BOpStats stats) { + + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + if (stats != null) { + + stats.chunksIn.increment(); + + stats.unitsIn.add(chunk.length); + + } + + // apply optional constraints. + final IBindingSet[] tmp = applyConstraints(chunk,constraints); + +// System.err.println("Copying: "+Arrays.toString(tmp)); + + // copy accepted binding sets to the default sink. + sink.add(tmp); + + if (sink2 != null) { + // copy accepted binding sets to the alt sink. + sink2.add(tmp); + } + + } + + } + + /** + * Return a dense array containing only those {@link IBindingSet}s which + * satisfy the constraints. + * + * @param chunk + * A chunk of binding sets. + * @param constraints + * The constraints (optional). + * + * @return The dense chunk of binding sets. + */ + static private IBindingSet[] applyConstraints(final IBindingSet[] chunk, + final IConstraint[] constraints) { + + if (constraints == null) { + + /* + * No constraints, copy all binding sets. + */ + + return chunk; + + } + + /* + * Copy binding sets which satisfy the constraint(s). + */ + + IBindingSet[] t = new IBindingSet[chunk.length]; + + int j = 0; + + for (int i = 0; i < chunk.length; i++) { + + final IBindingSet bindingSet = chunk[i]; + + if (BOpUtility.isConsistent(constraints, bindingSet)) { + + t[j++] = bindingSet; + + } + + } + + if (j != chunk.length) { + + // allocate exact size array. + final IBindingSet[] tmp = (IBindingSet[]) java.lang.reflect.Array + .newInstance(chunk[0].getClass(), j); + + // make a dense copy. + System.arraycopy(t/* src */, 0/* srcPos */, tmp/* dst */, + 0/* dstPos */, j/* len */); + + t = tmp; + + } + + return t; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -147,10 +147,10 @@ * * @param a * The array. - * + * * @return The map. */ - static public Map<String, Object> asMap(final NV[] a) { + static public Map<String, Object> asMap(final NV... a) { final Map<String, Object> tmp = new LinkedHashMap<String, Object>( a.length); @@ -162,7 +162,7 @@ } return tmp; - + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -56,7 +56,7 @@ /** * The value of the annotation is the {@link BOp.Annotations#BOP_ID} of * the ancestor in the operator tree which serves as the default sink - * for binding sets (default is the parent). + * for binding sets (optional, default is the parent). */ String SINK_REF = PipelineOp.class.getName() + ".sinkRef"; Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -1,208 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 25, 2010 - */ - -package com.bigdata.bop.bset; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IConstraint; -import com.bigdata.bop.engine.BOpStats; -import com.bigdata.bop.engine.IChunkAccessor; -import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.relation.accesspath.IBlockingBuffer; - -/** - * This operator copies its source to its sink. Specializations exist which are - * used to feed the the initial set of intermediate results into a pipeline ( - * {@link StartOp}) and which are used to replicate intermediate results to more - * than one sink ({@link Tee}). - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class CopyBindingSetOp extends PipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public interface Annotations extends PipelineOp.Annotations { - - /** - * An optional {@link IConstraint}[] which places restrictions on the - * legal patterns in the variable bindings. - */ - String CONSTRAINTS = CopyBindingSetOp.class.getName() + ".constraints"; - - } - - /** - * Deep copy constructor. - * - * @param op - */ - public CopyBindingSetOp(CopyBindingSetOp op) { - super(op); - } - - /** - * Shallow copy constructor. - * - * @param args - * @param annotations - */ - public CopyBindingSetOp(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - - /** - * @see Annotations#CONSTRAINTS - */ - public IConstraint[] constraints() { - - return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new CopyTask(this, context)); - - } - - /** - * Copy the source to the sink. - * - * @todo Optimize this. When using an {@link IChunkAccessor} we should be - * able to directly output the same chunk. - */ - static private class CopyTask implements Callable<Void> { - - private final BOpContext<IBindingSet> context; - - /** - * The constraint (if any) specified for the join operator. - */ - final private IConstraint[] constraints; - - CopyTask(final CopyBindingSetOp op, - final BOpContext<IBindingSet> context) { - - this.context = context; - - this.constraints = op.constraints(); - - } - - public Void call() throws Exception { - final IAsynchronousIterator<IBindingSet[]> source = context - .getSource(); - final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); - final IBlockingBuffer<IBindingSet[]> sink2 = context.getSink2(); - try { - final BOpStats stats = context.getStats(); - while (source.hasNext()) { - final IBindingSet[] chunk = source.next(); - stats.chunksIn.increment(); - stats.unitsIn.add(chunk.length); - final IBindingSet[] tmp = applyConstraints(chunk); - sink.add(tmp); - if (sink2 != null) - sink2.add(tmp); - } - sink.flush(); - if (sink2 != null) - sink2.flush(); - return null; - } finally { - sink.close(); - if (sink2 != null) - sink2.close(); - source.close(); - } - } - - private IBindingSet[] applyConstraints(final IBindingSet[] chunk) { - - if (constraints == null) { - - /* - * No constraints, copy all binding sets. - */ - - return chunk; - - } - - /* - * Copy binding sets which satisfy the constraint(s). - */ - - IBindingSet[] t = new IBindingSet[chunk.length]; - - int j = 0; - - for (int i = 0; i < chunk.length; i++) { - - final IBindingSet bindingSet = chunk[i]; - - if (context.isConsistent(constraints, bindingSet)) { - - t[j++] = bindingSet; - - } - - } - - if (j != chunk.length) { - - // allocate exact size array. - final IBindingSet[] tmp = (IBindingSet[]) java.lang.reflect.Array - .newInstance(chunk[0].getClass(), j); - - // make a dense copy. - System.arraycopy(t/* src */, 0/* srcPos */, tmp/* dst */, - 0/* dstPos */, j/* len */); - - t = tmp; - - } - - return t; - - } - - } // class CopyTask - -} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java (from rev 3706, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,203 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 25, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkAccessor; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +/** + * This operator copies its source to its sink(s). Specializations exist which are + * used to feed the the initial set of intermediate results into a pipeline ( + * {@link StartOp}) and which are used to replicate intermediate results to more + * than one sink ({@link Tee}). + * + * @see Annotations#SINK_REF + * @see Annotations#ALT_SINK_REF + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class CopyOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * An optional {@link IConstraint}[] which places restrictions on the + * legal patterns in the variable bindings. + */ + String CONSTRAINTS = CopyOp.class.getName() + ".constraints"; + + /** + * An optional {@link IBindingSet}[] to be used <strong>instead</strong> + * of the default source. + */ + String BINDING_SETS = CopyOp.class.getName() + ".bindingSets"; + + } + + /** + * Deep copy constructor. + * + * @param op + */ + public CopyOp(CopyOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public CopyOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + /** + * @see Annotations#CONSTRAINTS + */ + public IConstraint[] constraints() { + + return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); + + } + + /** + * @see Annotations#BINDING_SETS + */ + public IBindingSet[] bindingSets() { + + return getProperty(Annotations.BINDING_SETS, null/* defaultValue */); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new CopyTask(this, context)); + + } + + /** + * Copy the source to the sink. + * + * @todo Optimize this. When using an {@link IChunkAccessor} we should be + * able to directly output the same chunk. + */ + static private class CopyTask implements Callable<Void> { + + private final CopyOp op; + + private final BOpContext<IBindingSet> context; + + CopyTask(final CopyOp op, + final BOpContext<IBindingSet> context) { + + this.op = op; + + this.context = context; + + } + + public Void call() throws Exception { + + // source. + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); + + // default sink + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + // optional altSink. + final IBlockingBuffer<IBindingSet[]> sink2 = context.getSink2(); + + final BOpStats stats = context.getStats(); + + final IConstraint[] constraints = op.constraints(); + + try { + + final IBindingSet[] bindingSets = op.bindingSets(); + + if (bindingSets != null) { + + // copy optional additional binding sets. + BOpUtility.copy( + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { bindingSets }), sink, + sink2, constraints, stats); + + } else { + + // copy binding sets from the source. + BOpUtility.copy(source, sink, sink2, constraints, stats); + + } + + // flush the sink. + sink.flush(); + if (sink2 != null) // and the optional altSink. + sink2.flush(); + + // Done. + return null; + + } finally { + + sink.close(); + + if (sink2 != null) + sink2.close(); + + source.close(); + + } + + } + + } // class CopyTask + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -5,10 +5,10 @@ import com.bigdata.bop.BOp; /** - * A version of {@link CopyBindingSetOp} which is always evaluated on the query + * A version of {@link CopyOp} which is always evaluated on the query * controller. */ -public class StartOp extends CopyBindingSetOp { +public class StartOp extends CopyOp { /** * Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -32,6 +32,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.controller.Union; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.rdf.rules.TMUtility; import com.bigdata.relation.RelationFusedView; @@ -87,7 +88,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public class Tee extends CopyBindingSetOp { +public class Tee extends CopyOp { /** * Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -1,128 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 18, 2010 - */ - -package com.bigdata.bop.bset; - -import java.util.Map; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.PipelineOp; - -/** - * UNION(ops)[maxParallel(default all)] - * <p> - * Executes each of the operands in the union as a subqueries. Each subquery is - * run as a separate query but is linked to the parent query in which the UNION - * is being evaluated. The subqueries do not receive bindings from the parent - * and may be executed independently. By default, the subqueries are run with - * unlimited parallelism. - * <p> - * Note: UNION runs on the query controller. The - * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be - * overridden to specify the parent of the UNION operator, thereby routing - * around the UNION operator itself. If you fail to do this, then the - * intermediate results of the subqueries will be routed through the UNION - * operator on the query controller. - * <p> - * UNION can not be used when intermediate results from other computations must - * be routed into subqueries. However, a {@link Tee} pattern may help in such - * cases. For example, a {@link Tee} may be used to create a union of pipeline - * joins for two access paths during truth maintenance. - * <p> - * For example: - * - * <pre> - * UNION([a,b,c],{}) - * </pre> - * - * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be initialized with a single empty {@link IBindingSet}. The - * output of those subqueries will be routed to the UNION operator (their - * parent) unless the subqueries explicitly override this behavior using - * {@link PipelineOp.Annotations#SINK_REF}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class Union extends PipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * @param args - * Two or more operators whose union is desired. - * @param annotations - */ - public Union(final PipelineOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - if (args.length < 2) - throw new IllegalArgumentException(); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - -// return new FutureTask<Void>(new UnionTask(this, context)); - throw new UnsupportedOperationException(); - } - -// /** -// * Pipeline union impl. -// * -// * FIXME All this does is copy its inputs to its outputs. Since we only run -// * one chunk of input at a time, it seems that the easiest way to implement -// * a union is to have the operators in the union just target the same sink. -// */ -// private static class UnionTask extends Haltable<Void> implements Callable<Void> { -// -// public UnionTask(// -// final Union op,// -// final BOpContext<IBindingSet> context -// ) { -// -// if (op == null) -// throw new IllegalArgumentException(); -// if (context == null) -// throw new IllegalArgumentException(); -// } -// -// public Void call() throws Exception { -// // TODO Auto-generated method stub -// throw new UnsupportedOperationException(); -// } -// -// } - -} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,360 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.bset.Tee; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.util.concurrent.LatchedExecutor; + +/** + * Executes each of the operands as a subquery. The operands are evaluated in + * the order given and with the annotated parallelism. Each subquery is run as a + * separate query but is linked to the parent query in the operator is being + * evaluated. The subqueries do not receive bindings from the parent and may be + * executed independently. By default, the subqueries are run with unlimited + * parallelism. + * <p> + * Note: This operator must on the query controller. The + * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be + * overridden to specify the parent of the this operator. If you fail to do + * this, then the intermediate results of the subqueries will be routed to this + * operator, which DOES NOT pass them on. This may cause unnecessary network + * traffic. It may also cause the query to block if the buffer capacity is + * limited. + * <p> + * If you want to route intermediate results from other computations into + * subqueries, then consider a {@link Tee} pattern instead. + * <p> + * For example: + * + * <pre> + * SLICE[1]( + * UNION[2]([a{sinkRef=1},b{sinkRef=1},c{sinkRef=1}],{}) + * ) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries MUST be explicitly routed to the SLICE operator + * using {@link PipelineOp.Annotations#SINK_REF} on each of the subqueries. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +abstract public class AbstractSubqueryOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * The maximum parallelism with which the subqueries will be evaluated + * (default is unlimited). + */ + String MAX_PARALLEL = AbstractSubqueryOp.class.getName() + + ".maxParallel"; + + int DEFAULT_MAX_PARALLEL = Integer.MAX_VALUE; + + } + + /** + * @see Annotations#MAX_PARALLEL + */ + public int getMaxParallel() { + return getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); + } + + /** + * Deep copy constructor. + */ + public AbstractSubqueryOp(final AbstractSubqueryOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public AbstractSubqueryOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + if (!getEvaluationContext().equals(BOpEvaluationContext.CONTROLLER)) + throw new IllegalArgumentException(Annotations.EVALUATION_CONTEXT + + "=" + getEvaluationContext()); + + if (!getProperty(Annotations.CONTROLLER, Annotations.DEFAULT_CONTROLLER)) + throw new IllegalArgumentException(Annotations.CONTROLLER); + +// // The id of this operator (if any). +// final Integer thisId = (Integer)getProperty(Annotations.BOP_ID); +// +// for(BOp op : args) { +// +// final Integer sinkId = (Integer) op +// .getRequiredProperty(Annotations.SINK_REF); +// +// if(sinkId.equals(thisId)) +// throw new RuntimeException("Operand may not target ") +// +// } + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new ControllerTask(this, context)); + + } + + /** + * Evaluates the arguments of the operator as subqueries. The arguments are + * evaluated in order. An {@link Executor} with limited parallelism to + * evaluate the arguments. If the controller operator is interrupted, then + * the subqueries are cancelled. If a subquery fails, then all subqueries + * are cancelled. + */ + private static class ControllerTask implements Callable<Void> { + + private final AbstractSubqueryOp controllerOp; + private final BOpContext<IBindingSet> context; + private final List<FutureTask<RunningQuery>> tasks = new LinkedList<FutureTask<RunningQuery>>(); + private final CountDownLatch latch; + private final int nparallel; + private final Executor executor; + + public ControllerTask(final AbstractSubqueryOp controllerOp, final BOpContext<IBindingSet> context) { + + if (controllerOp == null) + throw new IllegalArgumentException(); + + if (context == null) + throw new IllegalArgumentException(); + + this.controllerOp = controllerOp; + + this.context = context; + + this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); + + this.executor = new LatchedExecutor(context.getIndexManager() + .getExecutorService(), nparallel); + + this.latch = new CountDownLatch(controllerOp.arity()); + + /* + * Create FutureTasks for each subquery. The futures are submitted + * to the Executor yet. That happens in call(). By deferring the + * evaluation until call() we gain the ability to cancel all + * subqueries if any subquery fails. + */ + for (BOp op : controllerOp.args()) { + + /* + * Task runs subquery and cancels all subqueries in [tasks] if + * it fails. + */ + tasks.add(new FutureTask<RunningQuery>(new SubqueryTask(op, + context)) { + /* + * Hook future to count down the latch when the task is + * done. + */ + public void run() { + try { + super.run(); + } finally { + latch.countDown(); + } + } + }); + + } + + } + + /** + * Evaluate the subqueries with limited parallelism. + */ + public Void call() throws Exception { + + try { + + /* + * Run subqueries with limited parallelism. + */ + for (FutureTask<RunningQuery> ft : tasks) { + executor.execute(ft); + } + + /* + * Close the source. Controllers do not accept inputs from the + * pipeline. + */ + context.getSource().close(); + + /* + * Wait for all subqueries to complete. + */ + latch.await(); + + /* + * Get the futures, throwing out any errors. + */ + for (FutureTask<RunningQuery> ft : tasks) + ft.get(); + + // Now that we know the subqueries ran Ok, flush the sink. + context.getSink().flush(); + + // Done. + return null; + + } finally { + + // Cancel any tasks which are still running. + cancelTasks(); + + context.getSink().close(); + + if (context.getSink2() != null) + context.getSink2().close(); + + } + + } + + /** + * Cancel any running tasks. + */ + private void cancelTasks() { + + for (FutureTask<RunningQuery> ft : tasks) + ft.cancel(true/* mayInterruptIfRunning */); + + } + + /** + * Run a subquery. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class SubqueryTask implements Callable<RunningQuery> { + + /** + * The evaluation context for the parent query. + */ + private final BOpContext<IBindingSet> parentContext; + + /** + * The root operator for the subquery. + */ + private final BOp subQueryOp; + + public SubqueryTask(final BOp subQuery, + final BOpContext<IBindingSet> parentContext) { + + this.subQueryOp = subQuery; + + this.parentContext = parentContext; + + } + + public RunningQuery call() throws Exception { + + IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; + try { + + final QueryEngine queryEngine = parentContext.getRunningQuery() + .getQueryEngine(); + + final RunningQuery runningQuery = queryEngine + .eval(subQueryOp); + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningQuery.iterator(); + + // Copy solutions from the subquery to the query. + BOpUtility.copy(subquerySolutionItr, parentContext + .getSink(), null/* sink2 */, null/* constraints */, + null/* stats */); + + // wait for the subquery. + runningQuery.get(); + + // done. + return runningQuery; + + } catch (Throwable t) { + + // If a subquery fails, then cancel all of the subqueries. + ControllerTask.this.cancelTasks(); + + // rethrow the exception. + throw new RuntimeException(t); + + } finally { + + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + + } + + } // SubqueryTask + + } // ControllerTask + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-09-30 20:39:15 UTC (rev 3706) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -25,7 +25,7 @@ * Created on Aug 16, 2010 */ -package com.bigdata.bop.eval; +package com.bigdata.bop.controller; import java.io.Serializable; import java.util.LinkedList; Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,84 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; + +/** + * STEPS(ops) + * + * <pre> + * STEPS([a,b,c],{}) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in sequence. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries will be routed to the STEPS operator (their + * parent) unless the subqueries explicitly override this behavior using + * {@link PipelineOp.Annotations#SINK_REF}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class Steps extends AbstractSubqueryOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Deep copy constructor. + */ + public Steps(Steps op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * Two or more operators whose union is desired. + * @param annotations + */ + public Steps(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + if (getMaxParallel() != 1) + throw new IllegalArgumentException(Annotations.MAX_PARALLEL + "=" + + getMaxParallel()); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java (from rev 3740, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,83 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; + +/** + * UNION(ops)[maxParallel(default all)] + * <pre> + * UNION([a,b,c],{}) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries will be routed to the UNION operator (their + * parent) unless the subqueries explicitly override this behavior using + * {@link PipelineOp.Annotations#SINK_REF}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class Union extends AbstractSubqueryOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Deep copy constructor. + * + * @param op + */ + public Union(final Union op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * Two or more operators whose union is desired. + * @param annotations + */ + public Union(final BOp[] args, final Map<String, Object> annotations) { + + super(args, annotations); + + if (args.length < 2) + throw new IllegalArgumentException(); + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -56,6 +56,11 @@ IIndexManager getIndexManager(); /** + * The query engine. This may be used to submit subqueries for evaluation. + */ + QueryEngine getQueryEngine(); + + /** * Cancel the running query (normal termination). * <p> * Note: This method provides a means for an operator to indicate that the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-07 ... [truncated message content] |