From: <tho...@us...> - 2011-01-03 14:41:44
|
Revision: 4047 http://bigdata.svn.sourceforge.net/bigdata/?rev=4047&view=rev Author: thompsonbry Date: 2011-01-03 14:41:36 +0000 (Mon, 03 Jan 2011) Log Message: ----------- Added an OptionalJoinGroup operator. It issues a subquery for each binding set presented to the operator. If the subquery produces any solutions, then they are copied to the default sink. Otherwise the binding set presented to the operator is copied to the default sink. This provides optional semantics for the group. Modified the optional join group test suite to use the OptionalJoinGroup operator. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/OptionalJoinGroup.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestOptionalJoinGroup.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-01-02 22:49:27 UTC (rev 4046) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-01-03 14:41:36 UTC (rev 4047) @@ -136,6 +136,12 @@ * <p> * The value of the {@link #CONDITIONAL_GROUP} is an {@link Integer} * which uniquely identifies the group within the query. + * + * @deprecated The binding set stack push/pop mechanisms are not + * sufficient to support optional join groups. This + * annotation will be removed unless it proves valuable for + * marking the elements of a join group, in which case the + * javadoc needs to be updated. */ String CONDITIONAL_GROUP = PipelineOp.class.getName() + ".conditionalGroup"; @@ -158,6 +164,10 @@ * * @see #CONDITIONAL_GROUP * @see #ALT_SINK_REF + * + * @deprecated The binding set stack push/pop mechanisms are not + * sufficient to support optional join groups. This + * annotation will be removed. */ String ALT_SINK_GROUP = PipelineOp.class.getName() + ".altSinkGroup"; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-02 22:49:27 UTC (rev 4046) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-03 14:41:36 UTC (rev 4047) @@ -55,8 +55,9 @@ * executed independently. By default, the subqueries are run with unlimited * parallelism. * <p> - * Note: This operator must on the query controller. The - * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be + * Note: This operator must execute on the query controller. + * <p> + * The {@link PipelineOp.Annotations#SINK_REF} of each child operand should be * overridden to specify the parent of the this operator. If you fail to do * this, then the intermediate results of the subqueries will be routed to this * operator, which DOES NOT pass them on. This may cause unnecessary network @@ -193,10 +194,10 @@ this.latch = new CountDownLatch(controllerOp.arity()); /* - * Create FutureTasks for each subquery. The futures are submitted - * to the Executor yet. That happens in call(). By deferring the - * evaluation until call() we gain the ability to cancel all - * subqueries if any subquery fails. + * Create FutureTasks for each subquery. The futures are not + * submitted to the Executor yet. That happens in call(). By + * deferring the evaluation until call() we gain the ability to + * cancel all subqueries if any subquery fails. */ for (BOp op : controllerOp.args()) { Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/OptionalJoinGroup.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/OptionalJoinGroup.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/OptionalJoinGroup.java 2011-01-03 14:41:36 UTC (rev 4047) @@ -0,0 +1,418 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.LocalChunkMessage; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; +import com.bigdata.util.concurrent.LatchedExecutor; + +/** + * For each binding set presented, this operator executes a subquery. Any + * solutions produced by the subquery are copied to the default sink. If no + * solutions are produced, then the original binding set is copied to the + * default sink (optional join semantics). Each subquery is run as a separate + * query but is linked to the parent query in the operator is being evaluated. + * + * FIXME Is this true?: "This operator must on the query controller." For an + * optional join group in scale-out, we need to concentrate the solutions back + * to the controller if this is true. If it is not a requirement, then we can + * just issue the subquery from ANY node. + * + * FIXME Parallel evaluation of subqueries is not implemented. What is the + * appropriate parallelism for this operator? More parallelism should reduce + * latency but could increase the memory burden. Review this decision once we + * have the RWStore operating as a binding set buffer on the Java process heap. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class OptionalJoinGroup extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * The subquery to be evaluated for each binding sets presented to the + * {@link OptionalJoinGroup} (required). This should be a + * {@link PipelineOp}. + */ + String SUBQUERY = OptionalJoinGroup.class.getName() + ".subquery"; + + /** + * The maximum parallelism with which the subqueries will be evaluated + * (default {@value #DEFAULT_MAX_PARALLEL}). + */ + String MAX_PARALLEL = OptionalJoinGroup.class.getName() + + ".maxParallel"; + + int DEFAULT_MAX_PARALLEL = 1; + + } + + /** + * @see Annotations#MAX_PARALLEL + */ + public int getMaxParallel() { + return getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); + } + + /** + * Deep copy constructor. + */ + public OptionalJoinGroup(final OptionalJoinGroup op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public OptionalJoinGroup(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + +// if (!getEvaluationContext().equals(BOpEvaluationContext.CONTROLLER)) +// throw new IllegalArgumentException(Annotations.EVALUATION_CONTEXT +// + "=" + getEvaluationContext()); + + getRequiredProperty(Annotations.SUBQUERY); + + if (!getProperty(Annotations.CONTROLLER, Annotations.DEFAULT_CONTROLLER)) + throw new IllegalArgumentException(Annotations.CONTROLLER); + +// // The id of this operator (if any). +// final Integer thisId = (Integer)getProperty(Annotations.BOP_ID); +// +// for(BOp op : args) { +// +// final Integer sinkId = (Integer) op +// .getRequiredProperty(Annotations.SINK_REF); +// +// if(sinkId.equals(thisId)) +// throw new RuntimeException("Operand may not target ") +// +// } + + } + + public OptionalJoinGroup(final BOp[] args, NV... annotations) { + + this(args, NV.asMap(annotations)); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new ControllerTask(this, context)); + + } + + /** + * Evaluates the arguments of the operator as subqueries. The arguments are + * evaluated in order. An {@link Executor} with limited parallelism to + * evaluate the arguments. If the controller operator is interrupted, then + * the subqueries are cancelled. If a subquery fails, then all subqueries + * are cancelled. + */ + private static class ControllerTask implements Callable<Void> { + + private final OptionalJoinGroup controllerOp; + private final BOpContext<IBindingSet> context; +// private final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); +// private final CountDownLatch latch; + private final int nparallel; + private final PipelineOp subquery; + private final Executor executor; + + public ControllerTask(final OptionalJoinGroup controllerOp, final BOpContext<IBindingSet> context) { + + if (controllerOp == null) + throw new IllegalArgumentException(); + + if (context == null) + throw new IllegalArgumentException(); + + this.controllerOp = controllerOp; + + this.context = context; + + this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); + + this.subquery = (PipelineOp) controllerOp + .getRequiredProperty(Annotations.SUBQUERY); + + this.executor = new LatchedExecutor(context.getIndexManager() + .getExecutorService(), nparallel); + +// this.latch = new CountDownLatch(controllerOp.arity()); + +// /* +// * Create FutureTasks for each subquery. The futures are submitted +// * to the Executor yet. That happens in call(). By deferring the +// * evaluation until call() we gain the ability to cancel all +// * subqueries if any subquery fails. +// */ +// for (BOp op : controllerOp.args()) { +// +// /* +// * Task runs subquery and cancels all subqueries in [tasks] if +// * it fails. +// */ +// tasks.add(new FutureTask<IRunningQuery>(new SubqueryTask(op, +// context)) { +// /* +// * Hook future to count down the latch when the task is +// * done. +// */ +// public void run() { +// try { +// super.run(); +// } finally { +// latch.countDown(); +// } +// } +// }); +// +// } + + } + + /** + * Evaluate the subquery. + * + * @todo Support limited parallelism for each binding set read from the + * source. We will need to keep track of the running subqueries in + * order to wait on them before returning from this method and in + * order to cancel them if something goes wrong. + */ + public Void call() throws Exception { + + try { + + final IAsynchronousIterator<IBindingSet[]> sitr = context + .getSource(); + + // @todo test for interrupt/halted query? + while(sitr.hasNext()) { + + final IBindingSet[] chunk = sitr.next(); + + for(IBindingSet bset : chunk) { + + final FutureTask<IRunningQuery> ft = new FutureTask<IRunningQuery>( + new SubqueryTask(bset, subquery, context)); + + // run the subquery. + executor.execute(ft); + + // wait for the outcome. + ft.get(); + + } + + } + +// /* +// * Run subqueries with limited parallelism. +// */ +// for (FutureTask<IRunningQuery> ft : tasks) { +// executor.execute(ft); +// } +// +// /* +// * Wait for all subqueries to complete. +// */ +// latch.await(); +// +// /* +// * Get the futures, throwing out any errors. +// */ +// for (FutureTask<IRunningQuery> ft : tasks) +// ft.get(); + + // Now that we know the subqueries ran Ok, flush the sink. + context.getSink().flush(); + + // Done. + return null; + + } finally { + +// // Cancel any tasks which are still running. +// for (FutureTask<IRunningQuery> ft : tasks) +// ft.cancel(true/* mayInterruptIfRunning */); + + context.getSource().close(); + + context.getSink().close(); + + if (context.getSink2() != null) + context.getSink2().close(); + + } + + } + + /** + * Run a subquery. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class SubqueryTask implements Callable<IRunningQuery> { + + /** + * The evaluation context for the parent query. + */ + private final BOpContext<IBindingSet> parentContext; + + /** + * The source binding set. This will be copied to the output if + * there are no solutions for the subquery (optional join + * semantics). + */ + private final IBindingSet bset; + + /** + * The root operator for the subquery. + */ + private final BOp subQueryOp; + + public SubqueryTask(final IBindingSet bset, final BOp subQuery, + final BOpContext<IBindingSet> parentContext) { + + this.bset = bset; + + this.subQueryOp = subQuery; + + this.parentContext = parentContext; + + } + + public IRunningQuery call() throws Exception { + + IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; + try { + + final QueryEngine queryEngine = parentContext.getRunningQuery() + .getQueryEngine(); + +// final IRunningQuery runningQuery = queryEngine +// .eval(subQueryOp); + + final BOp startOp = BOpUtility.getPipelineStart(subQueryOp); + + final int startId = startOp.getId(); + + final UUID queryId = UUID.randomUUID(); + + // execute the subquery, passing in the source binding set. + final IRunningQuery runningQuery = queryEngine + .eval( + queryId, + (PipelineOp) subQueryOp, + new LocalChunkMessage<IBindingSet>( + queryEngine, + queryId, + startId, + -1 /* partitionId */, + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { bset } }))); + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningQuery.iterator(); + + // Copy solutions from the subquery to the query. + final long ncopied = BOpUtility.copy(subquerySolutionItr, + parentContext.getSink(), null/* sink2 */, + null/* constraints */, null/* stats */); + + // wait for the subquery. + runningQuery.get(); + + if (ncopied == 0L) { + + /* + * Since there were no solutions for the subquery, copy + * the original binding set to the default sink. + */ + parentContext.getSink().add(new IBindingSet[]{bset}); + + } + + // done. + return runningQuery; + + } catch (Throwable t) { + + /* + * If a subquery fails, then propagate the error to the + * parent and rethrow the first cause error out of the + * subquery. + */ + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt(t)); + + } finally { + + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + + } + + } // SubqueryTask + + } // ControllerTask + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/OptionalJoinGroup.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java 2011-01-02 22:49:27 UTC (rev 4046) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java 2011-01-03 14:41:36 UTC (rev 4047) @@ -77,6 +77,8 @@ // test STEPS // suite.addTestSuite(TestUnion.class); + suite.addTestSuite(TestOptionalJoinGroup.class); + // @todo test STAR (transitive closure). // suite.addTestSuite(TestStar.class); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestOptionalJoinGroup.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestOptionalJoinGroup.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestOptionalJoinGroup.java 2011-01-03 14:41:36 UTC (rev 4047) @@ -0,0 +1,1138 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 23, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.TestCase2; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.IVariableOrConstant; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.IPredicate.Annotations; +import com.bigdata.bop.ap.E; +import com.bigdata.bop.ap.Predicate; +import com.bigdata.bop.ap.R; +import com.bigdata.bop.bindingSet.ArrayBindingSet; +import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.bset.ConditionalRoutingOp; +import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.NEConstant; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkMessage; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.LocalChunkMessage; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; +import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.Dechunkerator; +import com.bigdata.striterator.ICloseableIterator; + +/** + * Test suite for handling of optional join groups during query evaluation + * against a local database instance. Optional join groups are handled using + * {@link IBindingSet#push()} when entering the join group and + * {@link IBindingSet#pop(boolean)} when exiting the join group. If the join + * group was successful for a given binding set, then <code>save:=true</code> is + * specified for {@link IBindingSet#pop(boolean)} and the applied bindings will + * be visible to the downstream consumer. Otherwise the bindings applied during + * the join group are simply discarded. + * + * <pre> + * -Dlog4j.configuration=bigdata/src/resources/logging/log4j.properties + * </pre> + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestOptionalJoinGroup extends TestCase2 { + + /** + * + */ + public TestOptionalJoinGroup() { + } + + /** + * @param name + */ + public TestOptionalJoinGroup(String name) { + super(name); + } + + @Override + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + static private final String namespace = "ns"; + private Journal jnl; + private QueryEngine queryEngine; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + loadData(jnl); + + queryEngine = new QueryEngine(jnl); + + queryEngine.init(); + + } + + /** + * Create and populate relation in the {@link #namespace}. + */ + private void loadData(final Journal store) { + + // create the relation. + final R rel = new R(store, namespace, ITx.UNISOLATED, new Properties()); + rel.create(); + + // data to insert (in key order for convenience). + final E[] a = {// + new E("Paul", "Mary"),// [0] + new E("Paul", "Brad"),// [1] + + new E("John", "Mary"),// [2] + new E("John", "Brad"),// [3] + + new E("Mary", "Brad"),// [4] + + new E("Brad", "Fred"),// [5] + new E("Brad", "Leon"),// [6] + }; + + // insert data (the records are not pre-sorted). + rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); + + // Do commit since not scale-out. + store.commit(); + + } + + public void tearDown() throws Exception { + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, + * empty {@link IBindingSet}. + * + * @param bindingSet + * the binding set. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet bindingSet) { + + return new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { bindingSet } }); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSets + * the binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[] bindingSets) { + + return new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { bindingSets }); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSetChunks + * the chunks of binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[][] bindingSetChunks) { + + return new ThickAsynchronousIterator<IBindingSet[]>(bindingSetChunks); + + } + + /** + * Unit test for optional join group. Three joins are used and target a + * {@link SliceOp}. The 2nd and 3rd joins are embedded in an + * {@link OptionalJoinGroup}. + * <P> + * The optional join group takes the form: + * + * <pre> + * (a b) + * optional { + * (b c) + * (c d) + * } + * </pre> + * + * The (a b) tail will match everything in the knowledge base. The join + * group takes us two hops out from ?b. There should be four solutions that + * succeed the optional join group: + * + * <pre> + * (paul mary brad fred) + * (paul mary brad leon) + * (john mary brad fred) + * (john mary brad leon) + * </pre> + * + * and five more that don't succeed the optional join group: + * + * <pre> + * (paul brad) * + * (john brad) * + * (mary brad) * + * (brad fred) + * (brad leon) + * </pre> + * + * In this cases marked with a <code>*</code>, ?c will become temporarily + * bound to fred and leon (since brad knows fred and leon), but the (c d) + * tail will fail since fred and leon don't know anyone else. At this point, + * the ?c binding must be removed from the solution. + */ + public void test_query_join2_optionals() throws Exception { + + // main query + final int startId = 1; // + final int joinId1 = 2; // : base join group. + final int predId1 = 3; // (a b) + final int joinGroup1 = 9; + final int sliceId = 8; // + + // subquery + final int joinId2 = 4; // : joinGroup1 + final int predId2 = 5; // (b c) + final int joinId3 = 6; // : joinGroup1 + final int predId3 = 7; // (c d) + + final IVariable<?> a = Var.var("a"); + final IVariable<?> b = Var.var("b"); + final IVariable<?> c = Var.var("c"); + final IVariable<?> d = Var.var("d"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { a, b }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { b, c }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred3Op = new Predicate<E>( + new IVariableOrConstant[] { c, d }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId3),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); + + final PipelineOp subQuery; + { + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { /*join1Op*/ },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp join3Op = new PipelineJoin<E>(// + new BOp[] { join2Op },// + new NV(Predicate.Annotations.BOP_ID, joinId3),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred3Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + subQuery = join3Op; + } + + final PipelineOp joinGroup1Op = new OptionalJoinGroup(new BOp[]{join1Op}, + new NV(Predicate.Annotations.BOP_ID, joinGroup1),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(OptionalJoinGroup.Annotations.SUBQUERY, subQuery),// + new NV(BOp.Annotations.CONTROLLER,true)// +// new NV(BOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{joinGroup1Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final IRunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // four solutions where the optional join succeeds. + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ), + // plus anything we read from the first access path which did not + // pass the optional join + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ) + }; + + /* + * junit.framework.AssertionFailedError: Iterator will deliver too + * many objects: reminder(3)=[{ a=John, b=Brad }, { a=Mary, b=Brad + * }, { a=Paul, b=Brad }]. + */ + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + } + + /** + * Unit test for optional join group with a filter. Three joins are used and + * target a {@link SliceOp}. The 2nd and 3rd joins are embedded in an + * optional join group. The optional join group contains a filter. + * <p> + * The optional join group takes the form: + * + * <pre> + * (a b) + * optional { + * (b c) + * (c d) + * filter(d != Leon) + * } + * </pre> + * + * The (a b) tail will match everything in the knowledge base. The join + * group takes us two hops out from ?b. There should be two solutions that + * succeed the optional join group: + * + * <pre> + * (paul mary brad fred) + * (john mary brad fred) + * </pre> + * + * and five more that don't succeed the optional join group: + * + * <pre> + * (paul brad) * + * (john brad) * + * (mary brad) * + * (brad fred) + * (brad leon) + * </pre> + * + * In the cases marked with a <code>*</code>, ?c will become temporarily + * bound to fred and leon (since brad knows fred and leon), but the (c d) + * tail will fail since fred and leon don't know anyone else. At this point, + * the ?c binding must be removed from the solution. + * <p> + * The filter (d != Leon) will prune the two solutions: + * + * <pre> + * (paul mary brad leon) + * (john mary brad leon) + * </pre> + * + * since ?d is bound to Leon in those cases. + */ + public void test_query_optionals_filter() throws Exception { + + // main query + final int startId = 1; + final int joinId1 = 2; // + final int predId1 = 3; // (a,b) + final int joinGroup1 = 9; + final int sliceId = 8; + + // subquery + final int joinId2 = 4; // : group1 + final int predId2 = 5; // (b,c) + final int joinId3 = 6; // : group1 + final int predId3 = 7; // (c,d) + + + final IVariable<?> a = Var.var("a"); + final IVariable<?> b = Var.var("b"); + final IVariable<?> c = Var.var("c"); + final IVariable<?> d = Var.var("d"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { a, b }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { b, c }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred3Op = new Predicate<E>( + new IVariableOrConstant[] { c, d }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId3),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); + + final PipelineOp subQuery; + { + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { /*join1Op*/ },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp join3Op = new PipelineJoin<E>(// + new BOp[] { join2Op },// + new NV(Predicate.Annotations.BOP_ID, joinId3),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred3Op),// + // constraint d != Leon + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new NEConstant(d, new Constant<String>("Leon")) }) +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + subQuery = join3Op; + } + + final PipelineOp joinGroup1Op = new OptionalJoinGroup(new BOp[]{join1Op}, + new NV(Predicate.Annotations.BOP_ID, joinGroup1),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(OptionalJoinGroup.Annotations.SUBQUERY, subQuery),// + new NV(BOp.Annotations.CONTROLLER,true)// +// new NV(BOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{joinGroup1Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final IRunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // two solutions where the optional join succeeds. + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + // plus anything we read from the first access path which did not + // pass the optional join + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ) + }; + + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + } + + /** + * Unit test for optional join group with a filter on a variable outside the + * optional join group. Three joins are used and target a {@link SliceOp}. + * The 2nd and 3rd joins are in embedded an {@link OptionalJoinGroup}. The + * optional join group contains a filter that uses a variable outside the + * optional join group. + * <P> + * The query takes the form: + * + * <pre> + * (a b) + * optional { + * (b c) + * (c d) + * filter(a != Paul) + * } + * </pre> + * + * The (a b) tail will match everything in the knowledge base. The join + * group takes us two hops out from ?b. There should be two solutions that + * succeed the optional join group: + * + * <pre> + * (john mary brad fred) + * (john mary brad leon) + * </pre> + * + * and six more that don't succeed the optional join group: + * + * <pre> + * (paul mary) * + * (paul brad) * + * (john brad) + * (mary brad) + * (brad fred) + * (brad leon) + * </pre> + * + * In the cases marked with a <code>*</code>, ?a is bound to Paul even + * though there is a filter that specifically prohibits a = Paul. This is + * because the filter is inside the optional join group, which means that + * solutions can still include a = Paul, but the optional join group should + * not run in that case. + */ + public void test_query_optionals_filter2() throws Exception { + + // main query + final int startId = 1; + final int joinId1 = 2; + final int predId1 = 3; // (a,b) + final int condId = 4; // (a != Paul) + final int joinGroup1 = 10; + final int sliceId = 9; + + // subquery (iff condition is satisfied) + final int joinId2 = 5; // : group1 + final int predId2 = 6; // (b,c) + final int joinId3 = 7; // : group1 + final int predId3 = 8; // (c,d) + + final IVariable<?> a = Var.var("a"); + final IVariable<?> b = Var.var("b"); + final IVariable<?> c = Var.var("c"); + final IVariable<?> d = Var.var("d"); + +// final Integer joinGroup1 = Integer.valueOf(1); + + /* + * Not quite sure how to write this one. I think it probably goes + * something like this: + * + * 1. startOp + * 2. join1Op(a b) + * 3. conditionalRoutingOp( if a = Paul then goto sliceOp ) + * 4. join2Op(b c) + * 5. join3Op(c d) + * 6. sliceOp + */ + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { a, b }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { b, c }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred3Op = new Predicate<E>( + new IVariableOrConstant[] { c, d }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId3),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); + + final IConstraint condition = new NEConstant(a, new Constant<String>("Paul")); + + final ConditionalRoutingOp condOp = new ConditionalRoutingOp(new BOp[]{join1Op}, + NV.asMap(new NV[]{// + new NV(BOp.Annotations.BOP_ID,condId), + new NV(PipelineOp.Annotations.SINK_REF, joinGroup1), // a != Paul + new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId), // a == Paul + new NV(ConditionalRoutingOp.Annotations.CONDITION, condition), + })); + + final PipelineOp subQuery; + { + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { /*condOp*/ },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp join3Op = new PipelineJoin<E>(// + new BOp[] { join2Op },// + new NV(Predicate.Annotations.BOP_ID, joinId3),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred3Op)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + subQuery = join3Op; + } + + final PipelineOp joinGroup1Op = new OptionalJoinGroup(new BOp[]{condOp}, + new NV(Predicate.Annotations.BOP_ID, joinGroup1),// +// new NV(PipelineOp.Annotations.CONDITIONAL_GROUP, joinGroup1),// + new NV(OptionalJoinGroup.Annotations.SUBQUERY, subQuery),// + new NV(BOp.Annotations.CONTROLLER,true)// +// new NV(BOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER)// +// // join is optional. +// new NV(PipelineJoin.Annotations.OPTIONAL, true),// +// // optional target is the same as the default target. +// new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId) + ); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{joinGroup1Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final IRunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // two solutions where the optional join succeeds. + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b, c, d },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + // plus anything we read from the first access path which did not + // pass the optional join + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Brad") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Fred") }// + ), + new ArrayBindingSet(// + new IVariable[] { a, b },// + new IConstant[] { new Constant<String>("Brad"), + new Constant<String>("Leon") }// + ) + }; + + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(5, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + } + + /** + * Verify the expected solutions. + * + * @param expected + * @param itr + */ + static public void assertSameSolutions(final IBindingSet[] expected, + final IAsynchronousIterator<IBindingSet[]> itr) { + try { + int n = 0; + while (itr.hasNext()) { + final IBindingSet[... [truncated message content] |