From: <tho...@us...> - 2011-01-21 14:28:24
|
Revision: 4156 http://bigdata.svn.sourceforge.net/bigdata/?rev=4156&view=rev Author: thompsonbry Date: 2011-01-21 14:28:17 +0000 (Fri, 21 Jan 2011) Log Message: ----------- Modified Union (and its base class AbstractSubqueryOp) to consume the binding sets from the source and added a unit test for this. However, UNION still can not be used in any position other than the 1st operator in a pipeline because its operands are the subqueries. Right now, in order to use UNION in a non-primary position you have to use a SubqueryOp to wrap the Union. And Union itself issues subqueries, so this is perhaps more overhead than we would like. In order to change this we would have to move the subqueries to be executed by the union from operands to annotations, which might be just fine. Added some convenience methods to QueryEngine for evaluation against non-empty binding sets. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -32,6 +32,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; @@ -45,24 +46,26 @@ import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.service.IBigdataFederation; import com.bigdata.util.concurrent.LatchedExecutor; /** * Executes each of the operands as a subquery. The operands are evaluated in * the order given and with the annotated parallelism. Each subquery is run as a * separate query but is linked to the parent query in the operator is being - * evaluated. The subqueries do not receive bindings from the parent and may be + * evaluated. The subqueries receive bindings from the pipeline and may be * executed independently. By default, the subqueries are run with unlimited - * parallelism. + * parallelism. Since the #of subqueries is generally small (2), this means that + * the subqueries run in parallel. * <p> * Note: This operator must execute on the query controller. * <p> * The {@link PipelineOp.Annotations#SINK_REF} of each child operand should be * overridden to specify the parent of the this operator. If you fail to do * this, then the intermediate results of the subqueries will be routed to this - * operator, which DOES NOT pass them on. This may cause unnecessary network - * traffic. It may also cause the query to block if the buffer capacity is - * limited. + * operator. This may cause unnecessary network traffic when running against the + * {@link IBigdataFederation}. It may also cause the query to block if the + * buffer capacity is limited. * <p> * If you want to route intermediate results from other computations into * subqueries, then consider a {@link Tee} pattern instead. @@ -76,9 +79,9 @@ * </pre> * * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be initialized with a single empty {@link IBindingSet}. The - * output of those subqueries MUST be explicitly routed to the SLICE operator - * using {@link PipelineOp.Annotations#SINK_REF} on each of the subqueries. + * subquery will be run once for each source {@link IBindingSet}. The output of + * those subqueries is explicitly routed to the SLICE operator using + * {@link PipelineOp.Annotations#SINK_REF} for efficiency in scale-out. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -168,13 +171,12 @@ private final AbstractSubqueryOp controllerOp; private final BOpContext<IBindingSet> context; - private final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); - private final CountDownLatch latch; private final int nparallel; private final Executor executor; - - public ControllerTask(final AbstractSubqueryOp controllerOp, final BOpContext<IBindingSet> context) { + public ControllerTask(final AbstractSubqueryOp controllerOp, + final BOpContext<IBindingSet> context) { + if (controllerOp == null) throw new IllegalArgumentException(); @@ -191,47 +193,90 @@ this.executor = new LatchedExecutor(context.getIndexManager() .getExecutorService(), nparallel); - this.latch = new CountDownLatch(controllerOp.arity()); + } - /* - * Create FutureTasks for each subquery. The futures are not - * submitted to the Executor yet. That happens in call(). By - * deferring the evaluation until call() we gain the ability to - * cancel all subqueries if any subquery fails. - */ - for (BOp op : controllerOp.args()) { + /** + * Evaluate the subqueries with limited parallelism. + */ + public Void call() throws Exception { - /* - * Task runs subquery and cancels all subqueries in [tasks] if - * it fails. - */ - tasks.add(new FutureTask<IRunningQuery>(new SubqueryTask(op, - context)) { - /* - * Hook future to count down the latch when the task is - * done. - */ - public void run() { - try { - super.run(); - } finally { - latch.countDown(); - } + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); + + try { + + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + for (IBindingSet bset : chunk) { + + consumeBindingSet(bset); + } - }); + + } + // Now that we know the subqueries ran Ok, flush the sink. + context.getSink().flush(); + + // Done. + return null; + + } finally { + + // Close the source. + source.close(); + + context.getSink().close(); + + if (context.getSink2() != null) + context.getSink2().close(); + } - + } - /** - * Evaluate the subqueries with limited parallelism. - */ - public Void call() throws Exception { + private void consumeBindingSet(final IBindingSet bset) + throws InterruptedException, ExecutionException { + final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); + try { + final CountDownLatch latch = new CountDownLatch(controllerOp + .arity()); + /* + * Create FutureTasks for each subquery. The futures are not + * submitted to the Executor yet. That happens in call(). By + * deferring the evaluation until call() we gain the ability to + * cancel all subqueries if any subquery fails. + */ + for (BOp op : controllerOp.args()) { + + /* + * Task runs subquery and cancels all subqueries in [tasks] + * if it fails. + */ + tasks.add(new FutureTask<IRunningQuery>(new SubqueryTask( + op, context, bset)) { + /* + * Hook future to count down the latch when the task is + * done. + */ + public void run() { + try { + super.run(); + } finally { + latch.countDown(); + } + } + }); + + } + + /* * Run subqueries with limited parallelism. */ for (FutureTask<IRunningQuery> ft : tasks) { @@ -239,12 +284,6 @@ } /* - * Close the source. Controllers do not accept inputs from the - * pipeline. - */ - context.getSource().close(); - - /* * Wait for all subqueries to complete. */ latch.await(); @@ -255,25 +294,14 @@ for (FutureTask<IRunningQuery> ft : tasks) ft.get(); - // Now that we know the subqueries ran Ok, flush the sink. - context.getSink().flush(); - - // Done. - return null; - } finally { // Cancel any tasks which are still running. for (FutureTask<IRunningQuery> ft : tasks) ft.cancel(true/* mayInterruptIfRunning */); - - context.getSink().close(); - - if (context.getSink2() != null) - context.getSink2().close(); } - + } /** @@ -294,12 +322,20 @@ */ private final BOp subQueryOp; + /** + * The input for this invocation of the subquery. + */ + private final IBindingSet bset; + public SubqueryTask(final BOp subQuery, - final BOpContext<IBindingSet> parentContext) { + final BOpContext<IBindingSet> parentContext, + final IBindingSet bset) { this.subQueryOp = subQuery; this.parentContext = parentContext; + + this.bset = bset; } @@ -312,7 +348,7 @@ final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - runningSubquery = queryEngine.eval(subQueryOp); + runningSubquery = queryEngine.eval(subQueryOp, bset); // Iterator visiting the subquery solutions. subquerySolutionItr = runningSubquery.iterator(); @@ -343,8 +379,10 @@ * Such exceptions are NOT propagated here and WILL NOT * cause the parent query to terminate. */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt(runningSubquery.getCause())); + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt( + runningSubquery == null ? t + : runningSubquery.getCause())); } return runningSubquery; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -28,7 +28,6 @@ package com.bigdata.bop.controller; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; @@ -40,10 +39,8 @@ import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.relation.accesspath.ThickAsynchronousIterator; /** * For each binding set presented, this operator executes a subquery. Any @@ -391,25 +388,28 @@ final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - final BOp startOp = BOpUtility.getPipelineStart(subQueryOp); - - final int startId = startOp.getId(); +// final BOp startOp = BOpUtility.getPipelineStart(subQueryOp); +// +// final int startId = startOp.getId(); +// +// final UUID queryId = UUID.randomUUID(); +// +// // execute the subquery, passing in the source binding set. +// runningSubquery = queryEngine +// .eval( +// queryId, +// (PipelineOp) subQueryOp, +// new LocalChunkMessage<IBindingSet>( +// queryEngine, +// queryId, +// startId, +// -1 /* partitionId */, +// new ThickAsynchronousIterator<IBindingSet[]>( +// new IBindingSet[][] { new IBindingSet[] { bset } }))); - final UUID queryId = UUID.randomUUID(); + runningSubquery = queryEngine.eval((PipelineOp) subQueryOp, + bset); - // execute the subquery, passing in the source binding set. - runningSubquery = queryEngine - .eval( - queryId, - (PipelineOp) subQueryOp, - new LocalChunkMessage<IBindingSet>( - queryEngine, - queryId, - startId, - -1 /* partitionId */, - new ThickAsynchronousIterator<IBindingSet[]>( - new IBindingSet[][] { new IBindingSet[] { bset } }))); - long ncopied = 0L; try { @@ -491,9 +491,11 @@ * Such exceptions are NOT propagated here and WILL NOT * cause the parent query to terminate. */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt(runningSubquery.getCause())); - } + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt( + runningSubquery == null ? t + : runningSubquery.getCause())); + } return runningSubquery; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -35,15 +35,15 @@ /** * UNION(ops)[maxParallel(default all)] + * * <pre> * UNION([a,b,c],{}) * </pre> * - * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be initialized with a single empty {@link IBindingSet}. The - * output of those subqueries will be routed to the UNION operator (their - * parent) unless the subqueries explicitly override this behavior using - * {@link PipelineOp.Annotations#SINK_REF}. + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel for each + * source {@link IBindingSet}. The output of those subqueries will be routed to + * the UNION operator (their parent) unless the subqueries explicitly override + * this behavior using {@link PipelineOp.Annotations#SINK_REF}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -791,16 +791,57 @@ */ public AbstractRunningQuery eval(final BOp op) throws Exception { + return eval(op, new ListBindingSet()); + + } + + /** + * Evaluate a query. This node will serve as the controller for the query. + * + * @param query + * The query to evaluate. + * @param bset + * The initial binding set to present. + * + * @return The {@link IRunningQuery}. + * + * @throws IllegalStateException + * if the {@link QueryEngine} has been {@link #shutdown()}. + * @throws Exception + */ + public AbstractRunningQuery eval(final BOp op, final IBindingSet bset) + throws Exception { + + return eval(op, newBindingSetIterator(bset)); + + } + + /** + * Evaluate a query. This node will serve as the controller for the query. + * + * @param query + * The query to evaluate. + * @param bsets + * The binding sets to be consumed by the query. + * + * @return The {@link IRunningQuery}. + * + * @throws IllegalStateException + * if the {@link QueryEngine} has been {@link #shutdown()}. + * @throws Exception + */ + public AbstractRunningQuery eval(final BOp op, + final IAsynchronousIterator<IBindingSet[]> bsets) throws Exception { + final BOp startOp = BOpUtility.getPipelineStart(op); final int startId = startOp.getId(); - + final UUID queryId = UUID.randomUUID(); return eval(queryId, (PipelineOp) op, new LocalChunkMessage<IBindingSet>(this/* queryEngine */, - queryId, startId, -1 /* partitionId */, - newBindingSetIterator(new ListBindingSet()))); + queryId, startId, -1 /* partitionId */, bsets)); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |