From: <tho...@us...> - 2011-03-30 18:02:03
|
Revision: 4351 http://bigdata.svn.sourceforge.net/bigdata/?rev=4351&view=rev Author: thompsonbry Date: 2011-03-30 18:01:55 +0000 (Wed, 30 Mar 2011) Log Message: ----------- Commit of partial progress towards a subquery hash join implementation. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryOp.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/AbstractSubqueryTestCase.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryHashJoinOp.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-03-30 17:08:25 UTC (rev 4350) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -38,6 +38,7 @@ import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.bop.solutions.SortOp.Annotations; import com.bigdata.relation.accesspath.IAsynchronousIterator; /** @@ -160,33 +161,18 @@ */ int DEFAULT_PIPELINE_QUEUE_CAPACITY = 10; - /** + /** * Annotation used to mark pipelined (aka vectored) operators. When * <code>false</code> the operator will use either "at-once" or * "blocked" evaluation depending on how it buffers its data for * evaluation. - */ - String PIPELINED = (PipelineOp.class.getName() + ".pipelined").intern(); - - /** - * @see #PIPELINED - */ - boolean DEFAULT_PIPELINED = true; - - /** - * For non-{@link #PIPELINED} operators, this non-negative value - * specifies the maximum #of bytes which the operator may buffer on the - * native heap before evaluation of the operator is triggered -or- ZERO - * (0) if the operator buffers the data on the Java heap (default - * {@value #DEFAULT_MAX_MEMORY}). When non-zero, the #of bytes specified - * should be a multiple of 4k. For a shared operation, the value is the - * maximum #of bytes which may be buffered per shard. * <p> - * Operator "at-once" evaluation will be used if either (a) the operator - * is buffering data on the Java heap; or (b) the operator is buffering - * data on the native heap and the amount of buffered data does not - * exceed the specified value for {@link #MAX_MEMORY}. For convenience, - * the value {@link Integer#MAX_VALUE} may be specified to indicate that + * When <code>false</code>, operator "at-once" evaluation will be used + * if either (a) the operator is buffering data on the Java heap; or (b) + * the operator is buffering data on the native heap and the amount of + * buffered data does not exceed the specified value for + * {@link #MAX_MEMORY}. For convenience, you may specify + * {@link Integer#MAX_VALUE} for {@link #MAX_MEMORY} to indicate that * "at-once" evaluation is required. * <p> * When data are buffered on the Java heap, "at-once" evaluation is @@ -202,6 +188,21 @@ * semantics. Such operators MUST throw an exception if the value of * this annotation could result in multiple evaluation passes. */ + String PIPELINED = (PipelineOp.class.getName() + ".pipelined").intern(); + + /** + * @see #PIPELINED + */ + boolean DEFAULT_PIPELINED = true; + + /** + * For non-{@link #PIPELINED} operators, this non-negative value + * specifies the maximum #of bytes which the operator may buffer on the + * native heap before evaluation of the operator is triggered -or- ZERO + * (0) if the operator buffers the data on the Java heap (default + * {@value #DEFAULT_MAX_MEMORY}). For a sharded operation, the value is + * the maximum #of bytes which may be buffered per shard. + */ String MAX_MEMORY = (PipelineOp.class.getName() + ".maxMemory").intern(); /** @@ -456,4 +457,24 @@ } + /** + * Assert that this operator is annotated as an "at-once" operator which + * buffers its data on the java heap. + */ + protected void assertAtOnceJavaHeapOp() { + + // operator may not be broken into multiple tasks. + if (getMaxParallel() != 1) { + throw new UnsupportedOperationException(Annotations.MAX_PARALLEL + + "=" + getMaxParallel()); + } + + // operator is "at-once" (not pipelined). + if (isPipelined()) { + throw new UnsupportedOperationException(Annotations.PIPELINED + "=" + + isPipelined()); + } + + } + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -0,0 +1,424 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.relation.accesspath.IAsynchronousIterator; + +/** + * Hash join with subquery. + * <p> + * All source solutions are fully materialized in a hash table. The keys of the + * hash table are the as-bound join variable(s). The values in the hash table is + * the list of solutions having a specific value for the as-bound join + * variables. Once all solutions are materialized, the subquery is evaluated + * once. For each solution materialized by the subquery, the operator probes the + * hash table using the as-bound join variables for the subquery solution. If + * there is a hit in the hash table, then operator then outputs the cross + * product of the subquery solution with the solutions list found under that key + * in the hash table, applying any optional CONSTRAINTS. + * <p> + * In order to support OPTIONAL semantics for the subquery, a bit flag must be + * carried for each entry in the hash table. Once the subquery solutions have + * been exhausted, if the bit was never set for some entry and the subquery is + * optional, then the solutions associated with that entry are output, applying + * any optional CONSTRAINTS. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class SubqueryHashJoinOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * The join variables (required). This is an {@link IVariable}[] with + * at least one variable. The order of the entries is used when forming + * the as-bound keys for the hash table. Duplicate elements and null + * elements are not permitted. + */ + String JOIN_VARS = SubqueryHashJoinOp.class.getName() + ".subquery"; + + /** + * The subquery to be evaluated (required). This should be a + * {@link PipelineOp}. (It is basically the equivalent of the + * {@link IPredicate} for a {@link PipelineJoin}). + */ + String SUBQUERY = SubqueryHashJoinOp.class.getName() + ".subquery"; + + /** + * An optional {@link IVariable}[] identifying the variables to be + * retained in the {@link IBindingSet}s written out by the operator. All + * variables are retained unless this annotation is specified. + * + * @todo This should be on {@link SubqueryOp} as well. + */ + String SELECT = SubqueryHashJoinOp.class.getName() + ".select"; + + /** + * An {@link IConstraint}[] which places restrictions on the legal + * patterns in the variable bindings (optional). + * + * @todo This should be on {@link SubqueryOp} as well. + */ + String CONSTRAINTS = SubqueryHashJoinOp.class.getName() + ".constraints"; + + /** + * When <code>true</code> the subquery has optional semantics (if the + * subquery fails, the original binding set will be passed along to the + * downstream sink anyway) (default {@value #DEFAULT_OPTIONAL}). + */ + String OPTIONAL = SubqueryHashJoinOp.class.getName() + ".optional"; + + boolean DEFAULT_OPTIONAL = false; + + } + +// /** +// * @see Annotations#MAX_PARALLEL +// */ +// public int getMaxParallel() { +// return getProperty(Annotations.MAX_PARALLEL, +// Annotations.DEFAULT_MAX_PARALLEL); +// } + + /** + * Deep copy constructor. + */ + public SubqueryHashJoinOp(final SubqueryHashJoinOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public SubqueryHashJoinOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + +// if (!getEvaluationContext().equals(BOpEvaluationContext.CONTROLLER)) +// throw new IllegalArgumentException(Annotations.EVALUATION_CONTEXT +// + "=" + getEvaluationContext()); + + final IVariable<?>[] joinVars = (IVariable[]) getRequiredProperty(Annotations.JOIN_VARS); + + if (joinVars.length == 0) + throw new IllegalArgumentException(Annotations.JOIN_VARS); + + for (IVariable<?> var : joinVars) { + + if (var == null) + throw new IllegalArgumentException(Annotations.JOIN_VARS); + + } + + getRequiredProperty(Annotations.SUBQUERY); + + assertAtOnceJavaHeapOp(); + +// if (!getProperty(Annotations.CONTROLLER, Annotations.DEFAULT_CONTROLLER)) +// throw new IllegalArgumentException(Annotations.CONTROLLER); + +// // The id of this operator (if any). +// final Integer thisId = (Integer)getProperty(Annotations.BOP_ID); +// +// for(BOp op : args) { +// +// final Integer sinkId = (Integer) op +// .getRequiredProperty(Annotations.SINK_REF); +// +// if(sinkId.equals(thisId)) +// throw new RuntimeException("Operand may not target ") +// +// } + + } + + public SubqueryHashJoinOp(final BOp[] args, NV... annotations) { + + this(args, NV.asMap(annotations)); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new ControllerTask(this, context)); + + } + + /** + * Evaluates the arguments of the operator as subqueries. The arguments are + * evaluated in order. An {@link Executor} with limited parallelism to + * evaluate the arguments. If the controller operator is interrupted, then + * the subqueries are cancelled. If a subquery fails, then all subqueries + * are cancelled. + */ + private static class ControllerTask implements Callable<Void> { + + private final BOpContext<IBindingSet> context; + private final boolean optional; + private final PipelineOp subquery; + + public ControllerTask(final SubqueryHashJoinOp controllerOp, final BOpContext<IBindingSet> context) { + + if (controllerOp == null) + throw new IllegalArgumentException(); + + if (context == null) + throw new IllegalArgumentException(); + + this.context = context; + + this.optional = controllerOp.getProperty(Annotations.OPTIONAL, + Annotations.DEFAULT_OPTIONAL); + + this.subquery = (PipelineOp) controllerOp + .getRequiredProperty(Annotations.SUBQUERY); + + } + + /** + * Evaluate the subquery. + * + * @todo Support limited parallelism for each binding set read from the + * source. We will need to keep track of the running subqueries in + * order to wait on them before returning from this method and in + * order to cancel them if something goes wrong. + */ + public Void call() throws Exception { + + try { + + final IAsynchronousIterator<IBindingSet[]> sitr = context + .getSource(); + + while(sitr.hasNext()) { + + final IBindingSet[] chunk = sitr.next(); + + for(IBindingSet bset : chunk) { + + final IRunningQuery runningSubquery = new SubqueryTask( + bset, subquery, context).call(); + + if (!runningSubquery.isDone()) { + + throw new AssertionError("Future not done: " + + runningSubquery.toString()); + + } + + } + + } + + // Now that we know the subqueries ran Ok, flush the sink. + context.getSink().flush(); + + // Done. + return null; + + } finally { + + context.getSource().close(); + + context.getSink().close(); + + if (context.getSink2() != null) + context.getSink2().close(); + + } + + } + + /** + * Run a subquery. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class SubqueryTask implements Callable<IRunningQuery> { + + /** + * The evaluation context for the parent query. + */ + private final BOpContext<IBindingSet> parentContext; + + /** + * The source binding set. This will be copied to the output if + * there are no solutions for the subquery (optional join + * semantics). + */ + private final IBindingSet bset; + + /** + * The root operator for the subquery. + */ + private final BOp subQueryOp; + + public SubqueryTask(final IBindingSet bset, final BOp subQuery, + final BOpContext<IBindingSet> parentContext) { + + this.bset = bset; + + this.subQueryOp = subQuery; + + this.parentContext = parentContext; + + } + + public IRunningQuery call() throws Exception { + + // The subquery + IRunningQuery runningSubquery = null; + // The iterator draining the subquery + IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; + try { + + final QueryEngine queryEngine = parentContext.getRunningQuery() + .getQueryEngine(); + + runningSubquery = queryEngine.eval((PipelineOp) subQueryOp, + bset); + + long ncopied = 0L; + try { + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningSubquery.iterator(); + + // Copy solutions from the subquery to the query. + ncopied = BOpUtility.copy(subquerySolutionItr, + parentContext.getSink(), null/* sink2 */, + null/* constraints */, null/* stats */); + + // wait for the subquery to halt / test for errors. + runningSubquery.get(); + + } catch (InterruptedException ex) { + + // this thread was interrupted, so cancel the subquery. + runningSubquery + .cancel(true/* mayInterruptIfRunning */); + + // rethrow the exception. + throw ex; + + } + + if (ncopied == 0L && optional) { + + /* + * Since there were no solutions for the subquery, copy + * the original binding set to the default sink. + * + * @todo If we add a CONSTRAINTS annotation to the + * SubqueryOp then we need to make sure that it is + * applied to all solutions copied out of the subquery. + */ + + parentContext.getSink().add(new IBindingSet[]{bset}); + + } + + // done. + return runningSubquery; + + } catch (Throwable t) { + + if (runningSubquery == null + || runningSubquery.getCause() != null) { + /* + * If things fail before we start the subquery, or if a + * subquery fails (due to abnormal termination), then + * propagate the error to the parent and rethrow the + * first cause error out of the subquery. + * + * Note: IHaltable#getCause() considers exceptions + * triggered by an interrupt to be normal termination. + * Such exceptions are NOT propagated here and WILL NOT + * cause the parent query to terminate. + */ + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt( + runningSubquery == null ? t + : runningSubquery.getCause())); + } + + return runningSubquery; + + } finally { + + try { + + // ensure subquery is halted. + if (runningSubquery != null) + runningSubquery + .cancel(true/* mayInterruptIfRunning */); + + } finally { + + // ensure the subquery solution iterator is closed. + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + + } + + } + + } // SubqueryTask + + } // ControllerTask + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-03-30 17:08:25 UTC (rev 4350) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -36,13 +36,17 @@ import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IPredicate; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.join.PipelineJoin; import com.bigdata.relation.accesspath.IAsynchronousIterator; /** + * Pipelined join with subquery. + * <p> * For each binding set presented, this operator executes a subquery. Any * solutions produced by the subquery are copied to the default sink. If no * solutions are produced and {@link Annotations#OPTIONAL} is <code>true</code>, @@ -51,12 +55,14 @@ * the parent query is cancelled. * * @todo Parallel evaluation of subqueries is not implemented. What is the - * appropriate parallelism for this operator? More parallelism should reduce - * latency but could increase the memory burden. Review this decision once we - * have the RWStore operating as a binding set buffer on the Java process heap. + * appropriate parallelism for this operator? More parallelism should + * reduce latency but could increase the memory burden. Review this + * decision once we have the RWStore operating as a binding set buffer on + * the Java process heap. * + * @todo Rename as SubqueryPipelineJoinOp and support for SELECT and CONSTRAINTS. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * * @see AbstractSubqueryOp */ @@ -69,11 +75,12 @@ public interface Annotations extends PipelineOp.Annotations { - /** - * The subquery to be evaluated for each binding sets presented to the - * {@link SubqueryOp} (required). This should be a - * {@link PipelineOp}. - */ + /** + * The subquery to be evaluated for each binding sets presented to the + * {@link SubqueryOp} (required). This should be a {@link PipelineOp}. + * (It is basically the equivalent of the {@link IPredicate} for a + * {@link PipelineJoin}). + */ String SUBQUERY = (SubqueryOp.class.getName() + ".subquery").intern(); /** @@ -170,13 +177,11 @@ // // } - /** - * Evaluates the arguments of the operator as subqueries. The arguments are - * evaluated in order. An {@link Executor} with limited parallelism to - * evaluate the arguments. If the controller operator is interrupted, then - * the subqueries are cancelled. If a subquery fails, then all subqueries - * are cancelled. - */ + /** + * Evaluates the subquery for each source binding set. If the controller + * operator is interrupted, then the subqueries are cancelled. If a subquery + * fails, then all subqueries are cancelled. + */ private static class ControllerTask implements Callable<Void> { // private final SubqueryOp controllerOp; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java 2011-03-30 17:08:25 UTC (rev 4350) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -74,18 +74,8 @@ + getEvaluationContext()); } - // operator may not be broken into multiple tasks. - if (getMaxParallel() != 1) { - throw new UnsupportedOperationException(Annotations.MAX_PARALLEL - + "=" + getMaxParallel()); - } + assertAtOnceJavaHeapOp(); - // operator is "at-once" (not pipelined). - if (isPipelined()) { - throw new UnsupportedOperationException(Annotations.PIPELINED + "=" - + isPipelined()); - } - } public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/AbstractSubqueryTestCase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/AbstractSubqueryTestCase.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/AbstractSubqueryTestCase.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -0,0 +1,223 @@ +package com.bigdata.bop.controller; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import com.bigdata.bop.IBindingSet; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; +import com.bigdata.striterator.ICloseableIterator; + +import junit.framework.TestCase2; + +/** + * Abstract base class for subquery join test suites. + * + * @author thompsonbry + */ +abstract public class AbstractSubqueryTestCase extends TestCase2 { + + public AbstractSubqueryTestCase() { + } + + public AbstractSubqueryTestCase(String name) { + super(name); + } + + + /** + * Return an {@link IAsynchronousIterator} that will read a single, + * empty {@link IBindingSet}. + * + * @param bindingSet + * the binding set. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet bindingSet) { + + return new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { bindingSet } }); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSets + * the binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[] bindingSets) { + + return new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { bindingSets }); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSetChunks + * the chunks of binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[][] bindingSetChunks) { + + return new ThickAsynchronousIterator<IBindingSet[]>(bindingSetChunks); + + } + + /** + * Verify the expected solutions. + * + * @param expected + * @param itr + */ + static public void assertSameSolutions(final IBindingSet[] expected, + final IAsynchronousIterator<IBindingSet[]> itr) { + try { + int n = 0; + while (itr.hasNext()) { + final IBindingSet[] e = itr.next(); + if (log.isInfoEnabled()) + log.info(n + " : chunkSize=" + e.length); + for (int i = 0; i < e.length; i++) { + if (log.isInfoEnabled()) + log.info(n + " : " + e[i]); + if (n >= expected.length) { + fail("Willing to deliver too many solutions: n=" + n + + " : " + e[i]); + } + if (!expected[n].equals(e[i])) { + fail("n=" + n + ", expected=" + expected[n] + + ", actual=" + e[i]); + } + n++; + } + } + assertEquals("Wrong number of solutions", expected.length, n); + } finally { + itr.close(); + } + } + + /** + * Verifies that the iterator visits the specified objects in some arbitrary + * ordering and that the iterator is exhausted once all expected objects + * have been visited. The implementation uses a selection without + * replacement "pattern". + * <p> + * Note: If the objects being visited do not correctly implement hashCode() + * and equals() then this can fail even if the desired objects would be + * visited. When this happens, fix the implementation classes. + */ + static public <T> void assertSameSolutionsAnyOrder(final T[] expected, + final Iterator<T> actual) { + + assertSameSolutionsAnyOrder("", expected, actual); + + } + + /** + * Verifies that the iterator visits the specified objects in some arbitrary + * ordering and that the iterator is exhausted once all expected objects + * have been visited. The implementation uses a selection without + * replacement "pattern". + * <p> + * Note: If the objects being visited do not correctly implement hashCode() + * and equals() then this can fail even if the desired objects would be + * visited. When this happens, fix the implementation classes. + */ + static public <T> void assertSameSolutionsAnyOrder(final String msg, + final T[] expected, final Iterator<T> actual) { + + try { + + /* + * Populate a map that we will use to realize the match and + * selection without replacement logic. The map uses counters to + * handle duplicate keys. This makes it possible to write tests in + * which two or more binding sets which are "equal" appear. + */ + + final int nrange = expected.length; + + final java.util.Map<T, AtomicInteger> range = new java.util.LinkedHashMap<T, AtomicInteger>(); + + for (int j = 0; j < nrange; j++) { + + AtomicInteger count = range.get(expected[j]); + + if (count == null) { + + count = new AtomicInteger(); + + } + + range.put(expected[j], count); + + count.incrementAndGet(); + + } + + // Do selection without replacement for the objects visited by + // iterator. + + for (int j = 0; j < nrange; j++) { + + if (!actual.hasNext()) { + + fail(msg + + ": Iterator exhausted while expecting more object(s)" + + ": index=" + j); + + } + + final T actualObject = actual.next(); + + if (log.isInfoEnabled()) + log.info("visting: " + actualObject); + + AtomicInteger counter = range.get(actualObject); + + if (counter == null || counter.get() == 0) { + + fail("Object not expected" + ": index=" + j + ", object=" + + actualObject); + + } + + counter.decrementAndGet(); + + } + + if (actual.hasNext()) { + + final List<T> remainder = new LinkedList<T>(); + + while(actual.hasNext()) { + remainder.add(actual.next()); + } + + fail("Iterator will deliver too many objects: reminder(" + + remainder.size() + ")=" + remainder); + + } + + } finally { + + if (actual instanceof ICloseableIterator<?>) { + + ((ICloseableIterator<T>) actual).close(); + + } + + } + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java 2011-03-30 17:08:25 UTC (rev 4350) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -67,6 +67,8 @@ // suite.addTestSuite(TestSteps.class); suite.addTestSuite(TestSubqueryOp.class); + + suite.addTestSuite(TestSubqueryHashJoinOp.class); // @todo test STAR (transitive closure). // suite.addTestSuite(TestStar.class); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryHashJoinOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryHashJoinOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryHashJoinOp.java 2011-03-30 18:01:55 UTC (rev 4351) @@ -0,0 +1,940 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on March 30, 2011 + */ +package com.bigdata.bop.controller; + +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.IVariableOrConstant; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.IPredicate.Annotations; +import com.bigdata.bop.ap.E; +import com.bigdata.bop.ap.Predicate; +import com.bigdata.bop.ap.R; +import com.bigdata.bop.bindingSet.ArrayBindingSet; +import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.bset.ConditionalRoutingOp; +import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.Constraint; +import com.bigdata.bop.constraint.NEConstant; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkMessage; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.LocalChunkMessage; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.Dechunkerator; + +/** + * Unit tests for {@link SubqueryHashJoinOp}. + * + * @author thompsonbry + * + * TODO There should be a standard join test case (w/o optional) for + * both {@link SubqueryHashJoinOp} and {@link SubqueryOp}. + * + * TODO There should be a test case using constraints. + * + * TODO There should be a test case using SELECT to drop variables which + * are not required in the generated binding sets (and that feature + * should be part of the {@link SubqueryOp} as well). + */ +public class TestSubqueryHashJoinOp extends AbstractSubqueryTestCase { + + public TestSubqueryHashJoinOp() { + super(); + } + + public TestSubqueryHashJoinOp(final String name) { + super(name); + } + + @Override + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + static private final String namespace = "ns"; + private Journal jnl; + private QueryEngine queryEngine; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + loadData(jnl); + + queryEngine = new QueryEngine(jnl); + + queryEngine.init(); + + } + + /** + * Create and populate relation in the {@link #namespace}. + */ + private void loadData(final Journal store) { + + // create the relation. + final R rel = new R(store, namespace, ITx.UNISOLATED, new Properties()); + rel.create(); + + // data to insert (in key order for convenience). + final E[] a = {// + new E("Paul", "Mary"),// [0] + new E("Paul", "Brad"),// [1] + + new E("John", "Mary"),// [2] + new E("John", "Brad"),// [3] + + new E("Mary", "Brad"),// [4] + + new E("Brad", "Fred"),// [5] + new E("Brad", "Leon"),// [6] + }; + + // insert data (the records are not pre-sorted). + rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); + + // Do commit since not scale-out. + store.commit(); + + } + + public void tearDown() throws Exception { + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + } + + /** + * TODO Verify required and optional arguments, including "at-once". + */ + public void test_ctor() { + + fail("write tests"); + + } + + /** + * Unit test for optional join group. Three joins are used and target a + * {@link SliceOp}. The 2nd and 3rd joins are embedded in an + * {@link SubqueryOp}. + * <P> + * The optional join group takes the form: + * + * <pre> + * (a b) + * optional { + * (b c) + * (c d) + * } + * </pre> + * + * The (a b) tail will match everything in the knowledge base. The join + * group takes us two hops out from ?b. There should be four solutions that + * succeed the optional join group: + * + * <pre> + * (paul mary brad fred) + * (paul mary brad leon) + * (john mary brad fred) + * (john mary brad leon) + * </pre> + * + * and five more that don't succeed the optional join group: + * + * <pre> + * (paul brad) * + * (john brad) * + * (mary brad) * + * (brad fred) + * (brad leon) + * </pre> + * + * In this cases marked with a <code>*</code>, ?c will become temporarily + * bound to fred and leon (since brad knows fred and leon), but the (c d) + * tail will fail since fred and leon don't know anyone else. At this point, + * the ?c binding must be removed from the solution. + */ + public void test_query_join2_optionals() throws Exception { + + // main query + final int startId = 1; // + final int joinId1 = 2; // : base join group. + final int predId1 = 3; // (a b) + final int joinGroup1 = 9; + final int sliceId = 8; // + + // subquery + final int joinId2 = 4; // : joinGroup1 + final int predId2 = 5; // (b c) + final int joinId3 = 6; // : joinGroup1 + final int predId3 = 7; // (c d) + + final IVariable<?> a = Var.var("a"); + final IVariable<?> b = Var.var("b"); + final IVariable<?> c = Var.var("c"); + final IVariable<?> d = Var.var("d"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { a, b }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { b, c }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred3Op = new Predicate<E>( + new IVariableOrConstant[] { c, d }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId3),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); + + final PipelineOp subQuery; + { + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { /*join1Op*/ },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp join3Op = new PipelineJoin<E>(// + new BOp[] { join2Op },// + new NV(Predicate.Annotations.BOP_ID, joinId3),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred3Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + subQuery = join3Op; + } + + final PipelineOp joinGroup1Op = new SubqueryHashJoinOp(new BOp[]{join1Op}, + new NV(Predicate.Annotations.BOP_ID, joinGroup1),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(SubqueryOp.Annotations.SUBQUERY, subQuery),// +// , new NV(BOp.Annotations.CONTROLLER,true)// +// new NV(BOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER)// + // join is optional. + new NV(SubqueryOp.Annotations.OPTIONAL, true)// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{joinGroup1Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(PipelineOp.Annotations.SHARED_STATE,true),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final IRunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // four solutions where the optional join succeeds. + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ), + // plus anything we read from the first access path which did not + // pass the optional join + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ) + }; + + /* + * junit.framework.AssertionFailedError: Iterator will deliver too + * many objects: reminder(3)=[{ a=John, b=Brad }, { a=Mary, b=Brad + * }, { a=Paul, b=Brad }]. + */ + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + } + + /** + * Unit test for optional join group with a filter. Three joins are used and + * target a {@link SliceOp}. The 2nd and 3rd joins are embedded in an + * optional join group. The optional join group contains a filter. + * <p> + * The optional join group takes the form: + * + * <pre> + * (a b) + * optional { + * (b c) + * (c d) + * filter(d != Leon) + * } + * </pre> + * + * The (a b) tail will match everything in the knowledge base. The join + * group takes us two hops out from ?b. There should be two solutions that + * succeed the optional join group: + * + * <pre> + * (paul mary brad fred) + * (john mary brad fred) + * </pre> + * + * and five more that don't succeed the optional join group: + * + * <pre> + * (paul brad) * + * (john brad) * + * (mary brad) * + * (brad fred) + * (brad leon) + * </pre> + * + * In the cases marked with a <code>*</code>, ?c will become temporarily + * bound to fred and leon (since brad knows fred and leon), but the (c d) + * tail will fail since fred and leon don't know anyone else. At this point, + * the ?c binding must be removed from the solution. + * <p> + * The filter (d != Leon) will prune the two solutions: + * + * <pre> + * (paul mary brad leon) + * (john mary brad leon) + * </pre> + * + * since ?d is bound to Leon in those cases. + */ + public void test_query_optionals_filter() throws Exception { + + // main query + final int startId = 1; + final int joinId1 = 2; // + final int predId1 = 3; // (a,b) + final int joinGroup1 = 9; + final int sliceId = 8; + + // subquery + final int joinId2 = 4; // : group1 + final int predId2 = 5; // (b,c) + final int joinId3 = 6; // : group1 + final int predId3 = 7; // (c,d) + + + final IVariable<?> a = Var.var("a"); + final IVariable<?> b = Var.var("b"); + final IVariable<?> c = Var.var("c"); + final IVariable<?> d = Var.var("d"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { a, b }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { b, c }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred3Op = new Predicate<E>( + new IVariableOrConstant[] { c, d }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId3),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); + + final PipelineOp subQuery; + { + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { /*join1Op*/ },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp join3Op = new PipelineJoin<E>(// + new BOp[] { join2Op },// + new NV(Predicate.Annotations.BOP_ID, joinId3),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred3Op),// + // constraint d != Leon + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { Constraint.wrap(new NEConstant(d, new Constant<String>("Leon"))) }) +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + subQuery = join3Op; + } + + final PipelineOp joinGroup1Op = new SubqueryHashJoinOp(new BOp[]{join1Op}, + new NV(Predicate.Annotations.BOP_ID, joinGroup1),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(SubqueryOp.Annotations.SUBQUERY, subQuery),// +// new NV(BOp.Annotations.CONTROLLER,true)// +// new NV(BOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER)// + // join is optional. + new NV(SubqueryOp.Annotations.OPTIONAL, true)// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{joinGroup1Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(PipelineOp.Annotations.SHARED_STATE,true),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final IRunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // two solutions where the optional join succeeds. + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + // plus anything we read from the first access path which did not + // pass the optional join + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ) + }; + + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + } + + /** + * Unit test for optional join group with a filter on a variable outside the + * optional join group. Three joins are used and target a {@link SliceOp}. + * The 2nd and 3rd joins are in embedded an {@link SubqueryOp}. The + * optional join group contains a filter that uses a variable outside the + * optional join group. + * <P> + * The query takes the form: + * + * <pre> + * (a b) + * optional { + * (b c) + * (c d) + * filter(a != Paul) + * } + * </pre> + * + * The (a b) tail will match everything in the knowledge base. The join + * group takes us two hops out from ?b. There should be two solutions that + * succeed the optional join group: + * + * <pre> + * (john mary brad fred) + * (john mary brad leon) + * </pre> + * + * and six more that don't succeed the optional join group: + * + * <pre> + * (paul mary) * + * (paul brad) * + * (john brad) + * (mary brad) + * (brad fred) + * (brad leon) + * </pre> + * + * In the cases marked with a <code>*</code>, ?a is bound to Paul even + * though there is a filter that specifically prohibits a = Paul. This is + * because the filter is inside the optional join group, which means that + * solutions can still include a = Paul, but the optional join group should + * not run in that case. + */ + public void test_query_optionals_filter2() throws Exception { + + // main query + final int startId = 1; + final int joinId1 = 2; + final int predId1 = 3; // (a,b) + final int condId = 4; // (a != Paul) + f... [truncated message content] |