From: <tho...@us...> - 2010-10-06 14:44:49
|
Revision: 3740 http://bigdata.svn.sourceforge.net/bigdata/?rev=3740&view=rev Author: thompsonbry Date: 2010-10-06 14:44:41 +0000 (Wed, 06 Oct 2010) Log Message: ----------- Added support for PipelineOp.Annotations#SINK_REF (to override the default sink in support of routing around a union operator). Modified RunningQuery to permit the default sink and the optional sink to target the same operator. Modified the FederatedQueryEngine to use a thread pool to materialize chunks. Added unit tests for optional joins for the query engine and federated query engine. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -28,17 +28,11 @@ package com.bigdata.bop.bset; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; -import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.join.PipelineJoin; -import com.bigdata.rdf.rules.TMUtility; -import com.bigdata.relation.RelationFusedView; -import com.bigdata.util.concurrent.Haltable; /** * UNION(ops)[maxParallel(default all)] @@ -49,19 +43,30 @@ * and may be executed independently. By default, the subqueries are run with * unlimited parallelism. * <p> - * UNION is useful when independent queries are evaluated and their outputs are - * merged. Outputs from the UNION operator flow to the parent operator and will - * be mapped across shards or nodes as appropriate for the parent. UNION runs on - * the query controller. In order to avoid routing intermediate results through - * the controller, the {@link PipelineOp.Annotations#SINK_REF} of each - * child operand should be overridden to specify the parent of the UNION - * operator. + * Note: UNION runs on the query controller. The + * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be + * overridden to specify the parent of the UNION operator, thereby routing + * around the UNION operator itself. If you fail to do this, then the + * intermediate results of the subqueries will be routed through the UNION + * operator on the query controller. * <p> - * UNION can not be used when the intermediate results must be routed into the - * subqueries. However, a {@link Tee} pattern may help in such cases. For - * example, a {@link Tee} may be used to create a union of pipeline joins for - * two access paths during truth maintenance. + * UNION can not be used when intermediate results from other computations must + * be routed into subqueries. However, a {@link Tee} pattern may help in such + * cases. For example, a {@link Tee} may be used to create a union of pipeline + * joins for two access paths during truth maintenance. + * <p> + * For example: * + * <pre> + * UNION([a,b,c],{}) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries will be routed to the UNION operator (their + * parent) unless the subqueries explicitly override this behavior using + * {@link PipelineOp.Annotations#SINK_REF}. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -393,6 +393,12 @@ } + protected void execute(final Runnable r) { + + localIndexManager.getExecutorService().execute(r); + + } + /** * Runnable submits chunks available for evaluation against running queries. * @@ -438,7 +444,7 @@ if (log.isDebugEnabled()) log.debug("Running chunk: " + chunk); // execute task. - localIndexManager.getExecutorService().execute(ft); + execute(ft); } catch (RejectedExecutionException ex) { // shutdown of the pool (should be an unbounded // pool). Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -873,8 +873,7 @@ * target it with a message. (The sink will be null iff there is no * parent for this operator.) */ - sinkId = p == null ? null : (Integer) p - .getRequiredProperty(PipelineOp.Annotations.BOP_ID); + sinkId = getEffectiveDefaultSink(bop, p); // altSink (null when not specified). altSinkId = (Integer) op @@ -889,12 +888,12 @@ + bop); } - if (sinkId != null && altSinkId != null - && sinkId.intValue() == altSinkId.intValue()) { - throw new RuntimeException( - "The primary and alternative sink may not be the same operator: " - + bop); - } +// if (sinkId != null && altSinkId != null +// && sinkId.intValue() == altSinkId.intValue()) { +// throw new RuntimeException( +// "The primary and alternative sink may not be the same operator: " +// + bop); +// } /* * Setup the BOpStats object. For some operators, e.g., SliceOp, @@ -932,6 +931,38 @@ } /** + * Return the effective default sink. + * + * @param bop + * The operator. + * @param p + * The parent of that operator, if any. + */ + private Integer getEffectiveDefaultSink(final BOp bop, final BOp p) { + + if (bop == null) + throw new IllegalArgumentException(); + + Integer sink; + + // Explictly specified sink? + sink = (Integer) bop.getProperty(PipelineOp.Annotations.SINK_REF); + + if (sink == null) { + if (p == null) { + // No parent, so no sink. + return null; + } + // The parent is the sink. + sink = (Integer) p + .getRequiredProperty(PipelineOp.Annotations.BOP_ID); + } + + return sink; + + } + + /** * Evaluate the {@link IChunkMessage}. */ public void run() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -31,13 +31,10 @@ import java.nio.ByteBuffer; import java.rmi.RemoteException; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -99,23 +96,23 @@ */ private final IQueryClient clientProxy; - /** - * A queue of {@link IChunkMessage}s which needs to have their data - * materialized so an operator can consume those data on this node. - * This queue is drained by the {@link MaterializeChunksTask}. - */ - final private BlockingQueue<IChunkMessage<?>> chunkMaterializationQueue = new LinkedBlockingQueue<IChunkMessage<?>>(); +// /** +// * A queue of {@link IChunkMessage}s which needs to have their data +// * materialized so an operator can consume those data on this node. +// * This queue is drained by the {@link MaterializeChunksTask}. +// */ +// final private BlockingQueue<IChunkMessage<?>> chunkMaterializationQueue = new LinkedBlockingQueue<IChunkMessage<?>>(); /** - * The service on which we run {@link MaterializeChunksTask}. This is + * The service used to accept {@link IChunkMessage} for evaluation. This is * started by {@link #init()}. */ - private final AtomicReference<ExecutorService> materializeChunksService = new AtomicReference<ExecutorService>(); + private final AtomicReference<ExecutorService> acceptTaskService = new AtomicReference<ExecutorService>(); - /** - * The {@link Future} for the task draining the {@link #chunkMaterializationQueue}. - */ - private final AtomicReference<FutureTask<Void>> materializeChunksFuture = new AtomicReference<FutureTask<Void>>(); +// /** +// * The {@link Future} for the task draining the {@link #chunkMaterializationQueue}. +// */ +// private final AtomicReference<FutureTask<Void>> acceptMessageTaskFuture = new AtomicReference<FutureTask<Void>>(); @Override public UUID getServiceUUID() { @@ -229,17 +226,16 @@ /* * The proxy for this query engine when used as a query controller. * + * Note: DGC is relied on to clean up the exported proxy when the + * query engine dies. * - * Should the data services expose their query engine in this - * manner? - * - * @todo We need to unexport the proxy as well when the service is - * shutdown. This should follow the same pattern as DataService -> - * DataServer. E.g., a QueryEngineServer class. + * @todo There should be an explicit "QueryEngineServer" which is + * used as the front end for SPARQL queries. It should have an + * explicitly configured Exporter for its proxy. */ this.clientProxy = (IQueryClient) ((JiniFederation<?>) fed) - .getProxy(this, false/* enableDGC */); + .getProxy(this, true/* enableDGC */); } else { @@ -275,26 +271,11 @@ public void init() { super.init(); - - final FutureTask<Void> ft = new FutureTask<Void>( - new MaterializeChunksTask(), (Void) null); - - if (materializeChunksFuture.compareAndSet(null/* expect */, ft)) { - materializeChunksService.set(Executors - .newSingleThreadExecutor(new DaemonThreadFactory( - FederatedQueryEngine.class - + ".materializeChunksService"))); + acceptTaskService.set(Executors + .newCachedThreadPool(new DaemonThreadFactory( + FederatedQueryEngine.class + ".acceptService"))); -// getIndexManager().getExecutorService().execute(ft); - materializeChunksService.get().execute(ft); - - } else { - - throw new IllegalStateException("Already running"); - - } - } /** @@ -305,21 +286,14 @@ @Override protected void didShutdown() { - // stop materializing chunks. - final Future<?> f = materializeChunksFuture.get(); - if (f != null) { - f.cancel(true/* mayInterruptIfRunning */); - } - - // stop the service on which we ran the MaterializeChunksTask. - final ExecutorService s = materializeChunksService.get(); + // stop the service which is accepting messages. + final ExecutorService s = acceptTaskService.get(); if (s != null) { s.shutdownNow(); } // Clear the references. - materializeChunksFuture.set(null); - materializeChunksService.set(null); + acceptTaskService.set(null); } @@ -331,63 +305,50 @@ @Override public void shutdownNow() { - // stop materializing chunks. - final Future<?> f = materializeChunksFuture.get(); - if (f != null) - f.cancel(true/* mayInterruptIfRunning */); + // stop the service which is accepting messages. + final ExecutorService s = acceptTaskService.get(); + if (s != null) { + s.shutdownNow(); + } + // Clear the references. + acceptTaskService.set(null); + super.shutdownNow(); } /** - * Runnable materializes chunks and makes them available for further - * processing. + * Materialize an {@link IChunkMessage} for processing and place it on the + * queue of accepted messages. * - * @todo multiple threads for materializing chunks, not just one. can - * be multiple {@link MaterializeChunksTask}s running. + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - private class MaterializeChunksTask implements Runnable { + private class MaterializeMessageTask implements Runnable { + private final IChunkMessage<?> msg; + + public MaterializeMessageTask(final IChunkMessage<?> msg) { + this.msg = msg; + } + public void run() { - if(log.isInfoEnabled()) - log.info("running: " + this); - while (true) { - try { - final IChunkMessage<?> msg = chunkMaterializationQueue.take(); - if(log.isDebugEnabled()) - log.debug("msg=" + msg); - try { - if(!accept(msg)) { - if(log.isDebugEnabled()) - log.debug("dropping: " + msg); - continue; - } - if(log.isDebugEnabled()) - log.debug("accepted: " + msg); - /* - * @todo The type warning here is because the rest of - * the API does not know what to do with messages for - * chunks other than IBindingSet[], e.g., IElement[], - * etc. - */ - FederatedQueryEngine.this - .acceptChunk((IChunkMessage) msg); - } catch(Throwable t) { - if(InnerCause.isInnerCause(t, InterruptedException.class)) { - log.warn("Interrupted."); - return; - } - throw new RuntimeException(t); - } - } catch (InterruptedException e) { + try { + if (!accept(msg)) { + if (log.isDebugEnabled()) + log.debug("dropping: " + msg); + return; + } + if (log.isDebugEnabled()) + log.debug("accepted: " + msg); + FederatedQueryEngine.this.acceptChunk((IChunkMessage) msg); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { log.warn("Interrupted."); return; - } catch (Throwable ex) { - // log and continue - log.error(ex, ex); - continue; } + throw new RuntimeException(t); } } @@ -410,59 +371,30 @@ if (q == null) { - /* - * This code path handles the message the first time a chunk is - * observed on a node for a query. Since we do not broadcast the - * query to all nodes, the node has to resolve the query from the - * query controller. - * - * @todo Track recently terminated queries and do not recreate them. - */ - // true iff this is the query controller final boolean isController = getServiceUUID().equals( msg.getQueryController().getServiceUUID()); - - if(isController) { + + if (isController) { /* * @todo This would indicate that the query had been * concurrently terminated and cleared from the set of - * runningQueries and that we were not retaining metadata about - * queries which had been terminated. + * runningQueries and that we were not retaining metadata + * about queries which had been terminated. */ throw new AssertionError( "Query not running on controller: thisService=" + getServiceUUID() + ", msg=" + msg); } - /* - * Request the query from the query controller (RMI). - * - * @todo RMI is too expensive. Apply a memoizer pattern to avoid - * race conditions. - */ - final PipelineOp query = msg.getQueryController() - .getQuery(msg.getQueryId()); - - q = newRunningQuery(FederatedQueryEngine.this, queryId, - false/* controller */, msg.getQueryController(), query); - - final RunningQuery tmp = runningQueries.putIfAbsent(queryId, q); + // Get the query declaration from the query controller. + q = getDeclaredQuery(queryId); - if(tmp != null) { - - // another thread won this race. - q = (FederatedRunningQuery) tmp; - - } - } -// if(q == null) -// throw new RuntimeException(ERR_QUERY_NOT_RUNNING + queryId); - if (!q.isCancelled() && !msg.isMaterialized()) { + // materialize the chunk for this message. msg.materialize(q); } @@ -470,9 +402,44 @@ return !q.isCancelled(); } - - } // MaterializeChunksTask + /** + * This code path handles the message the first time a chunk is observed + * on a node for a query. Since we do not broadcast the query to all + * nodes, the node has to resolve the query from the query controller. + * + * @throws RemoteException + * + * @todo Track recently terminated queries and do not recreate them. + */ + private FederatedRunningQuery getDeclaredQuery(final UUID queryId) + throws RemoteException { + + /* + * Request the query from the query controller (RMI). + */ + final PipelineOp query = msg.getQueryController().getQuery( + msg.getQueryId()); + + FederatedRunningQuery q = newRunningQuery( + FederatedQueryEngine.this, queryId, false/* controller */, + msg.getQueryController(), query); + + final RunningQuery tmp = runningQueries.putIfAbsent(queryId, q); + + if (tmp != null) { + + // another thread won this race. + q = (FederatedRunningQuery) tmp; + + } + + return q; + + } + + } + public void declareQuery(final IQueryDecl queryDecl) { final UUID queryId = queryDecl.getQueryId(); @@ -489,11 +456,22 @@ if (msg == null) throw new IllegalArgumentException(); + if(log.isDebugEnabled()) + log.debug("msg=" + msg); + assertRunning(); - // queue up message to be materialized or otherwise handled later. - chunkMaterializationQueue.add(msg); + /* + * Schedule task to materialized or otherwise handle the message. + */ + + final Executor s = acceptTaskService.get(); + if (s == null) + throw new RuntimeException("Not running"); + + s.execute(new MaterializeMessageTask(msg)); + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -44,9 +44,7 @@ import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpEvaluationContext; -import com.bigdata.bop.PipelineOp; import com.bigdata.bop.Constant; import com.bigdata.bop.HashBindingSet; import com.bigdata.bop.IBindingSet; @@ -55,12 +53,14 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; import com.bigdata.bop.Var; import com.bigdata.bop.ap.E; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.ConditionalRoutingOp; import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.EQ; import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.fed.TestFederatedQueryEngine; import com.bigdata.bop.join.PipelineJoin; @@ -1143,7 +1143,6 @@ BOpEvaluationContext.CONTROLLER),// })); - // @todo the KEY_ORDER should be bound before evaluation. final Predicate<?> pred1Op = new Predicate<E>(new IVariableOrConstant[] { Var.var("x"), Var.var("y") }, NV .asMap(new NV[] {// @@ -1411,16 +1410,212 @@ return nsuccess; } - + /** - * @todo Write unit tests for optional joins, including where an alternative - * sink is specified in the {@link BOpContext} and is used when the - * join fails. + * Unit test for optional join. Two joins are used and target a + * {@link SliceOp}. The 2nd join is marked as optional. Intermediate results + * which do not succeed on the optional join are forwarded to the + * {@link SliceOp} which is the target specified by the + * {@link PipelineOp.Annotations#ALT_SINK_REF}. + * + * @todo Write unit test for optional join groups. Here the goal is to + * verify that intermediate results may skip more than one join. This + * was a problem for the old query evaluation approach since binding + * sets had to cascade through the query one join at a time. However, + * the new query engine design should handle this case. */ - public void test_query_join2_optionals() { + public void test_query_join2_optionals() throws Exception { - fail("write test"); + final int startId = 1; + final int joinId1 = 2; + final int predId1 = 3; + final int joinId2 = 4; + final int predId2 = 5; + final int sliceId = 6; + + final IVariable<?> x = Var.var("x"); + final IVariable<?> y = Var.var("y"); + final IVariable<?> z = Var.var("z"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { x, y }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { y, z }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + startOp, pred1Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + })); + final PipelineOp join2Op = new PipelineJoin<E>(// + join1Op, pred2Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + // constraint x == z + new NV(PipelineJoin.Annotations.CONSTRAINTS,new IConstraint[]{ + new EQ(x,z) + }), + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL,true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF,sliceId),// + })); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{join2Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // two solutions where the 2nd join succeeds. + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ), + // plus anything we read from the first access path which did not join. + new ArrayBindingSet(// + new IVariable[] { Var.var("x"), Var.var("y") },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary") }// + ), + new ArrayBindingSet(// + new IVariable[] { Var.var("x"), Var.var("y") },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Paul") }// + ) + }; + + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + +// new E("John", "Mary"),// [0] +// new E("Leon", "Paul"),// [1] +// new E("Mary", "Paul"),// [2] +// new E("Paul", "Leon"),// [3] + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the 1st join operator. + { + final BOpStats stats = statsMap.get(joinId1); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join1: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the 2nd join operator. + { + final BOpStats stats = statsMap.get(joinId2); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join2: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(1L, stats.chunksIn.get()); + assertEquals(4L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + + // Validate stats for the sliceOp. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(2L, stats.chunksIn.get()); + assertEquals(4L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -35,7 +35,6 @@ import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.Constant; import com.bigdata.bop.HashBindingSet; @@ -51,6 +50,7 @@ import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.EQ; import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; @@ -1014,7 +1014,6 @@ new String[] { namespace }),// new NV(Predicate.Annotations.BOP_ID, predId1),// new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// -// new NV(Predicate.Annotations.KEY_ORDER,R.primaryKeyOrder),// })); final Predicate<?> pred2Op = new Predicate<E>(new IVariableOrConstant[] { @@ -1024,7 +1023,6 @@ new String[] { namespace }),// new NV(Predicate.Annotations.BOP_ID, predId2),// new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// -// new NV(Predicate.Annotations.KEY_ORDER,R.primaryKeyOrder),// })); final PipelineOp join1Op = new PipelineJoin<E>(// @@ -1172,14 +1170,229 @@ } /** - * @todo Write unit tests for optional joins, including where an alternative - * sink is specified in the {@link BOpContext} and is used when the - * join fails. - * */ - public void test_query_join2_optionals() { + * Unit test for optional join. Two joins are used and target a + * {@link SliceOp}. The 2nd join is marked as optional. Intermediate results + * which do not succeed on the optional join are forwarded to the + * {@link SliceOp} which is the target specified by the + * {@link PipelineOp.Annotations#ALT_SINK_REF}. + * + * @todo Write unit test for optional join groups. Here the goal is to + * verify that intermediate results may skip more than one join. This + * was a problem for the old query evaluation approach since binding + * sets had to cascade through the query one join at a time. However, + * the new query engine design should handle this case. + */ + public void test_query_join2_optionals() throws Exception { - fail("write test"); + final int startId = 1; + final int joinId1 = 2; + final int predId1 = 3; + final int joinId2 = 4; + final int predId2 = 5; + final int sliceId = 6; + + final IVariable<?> x = Var.var("x"); + final IVariable<?> y = Var.var("y"); + final IVariable<?> z = Var.var("z"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { x, y }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { y, z }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + startOp, pred1Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + // Note: shard-partitioned joins! + new NV( Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// + })); + final PipelineOp join2Op = new PipelineJoin<E>(// + join1Op, pred2Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + // Note: shard-partitioned joins! + new NV( Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// + // constraint x == z + new NV(PipelineJoin.Annotations.CONSTRAINTS,new IConstraint[]{ + new EQ(x,z) + }), + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL,true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF,sliceId),// + })); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{join2Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // solutions where the 2nd join succeeds. + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("John") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("John"), + new Constant<String>("Mary") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ), + /* + * Plus anything we read from the first access path which + * did not pass the 2nd join. + */ + new ArrayBindingSet(// + new IVariable[] { Var.var("x"), Var.var("y") },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Paul") }// + ), + }; + + TestQueryEngine.assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + +// // partition0 +// new E("John", "Mary"),// +// new E("Leon", "Paul"),// +// // partition1 +// new E("Mary", "John"),// +// new E("Mary", "Paul"),// +// new E("Paul", "Leon"),// + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the 1st join operator. + { + final BOpStats stats = statsMap.get(joinId1); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join1: " + stats.toString()); + + // verify query solution stats details. + assertEquals(2L, stats.chunksIn.get()); + assertEquals(2L, stats.unitsIn.get()); + assertEquals(5L, stats.unitsOut.get()); + assertEquals(2L, stats.chunksOut.get()); + } + + // validate the stats for the 2nd join operator. + { + final BOpStats stats = statsMap.get(joinId2); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join2: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(1L, stats.chunksIn.get()); + assertEquals(5L, stats.unitsIn.get()); + assertEquals(5L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + + // Validate stats for the sliceOp. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(2L, stats.chunksIn.get()); + assertEquals(5L, stats.unitsIn.get()); + assertEquals(5L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |