From: <tho...@us...> - 2010-08-27 14:57:31
|
Revision: 3467 http://bigdata.svn.sourceforge.net/bigdata/?rev=3467&view=rev Author: thompsonbry Date: 2010-08-27 14:57:24 +0000 (Fri, 27 Aug 2010) Log Message: ----------- Working on query engine handling of the output chunks by wrapping the FutureTask for the operator computation. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalBTree.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalShard.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MapBindingSetsOverShards.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineDelayOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/ReceiveBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -27,7 +27,7 @@ package com.bigdata.bop; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import com.bigdata.bop.engine.BOpStats; import com.bigdata.relation.accesspath.BlockingBuffer; @@ -111,14 +111,17 @@ IBlockingBuffer<E[]> newBuffer(); /** - * Initiate execution for the operator, returning a {@link Future} which for - * that evaluation. + * Return a {@link FutureTask} which computes the operator against the + * evaluation context. The caller is responsible for executing the + * {@link FutureTask} (this gives them the ability to hook the completion of + * the computation). * * @param context * The evaluation context. * - * @return The {@link Future} for the operator's evaluation. + * @return The {@link FutureTask} which will compute the operator's + * evaluation. */ - Future<Void> eval(BOpContext<E> context); + FutureTask<Void> eval(BOpContext<E> context); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -71,13 +71,9 @@ super(args, annotations); } - public Future<Void> eval(final BOpContext<IBindingSet> context) { + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - final FutureTask<Void> ft = new FutureTask<Void>(new CopyTask(context)); - - context.getIndexManager().getExecutorService().execute(ft); - - return ft; + return new FutureTask<Void>(new CopyTask(context)); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -32,7 +32,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import com.bigdata.bop.AbstractPipelineOp; @@ -233,14 +232,10 @@ } - public Future<Void> eval(final BOpContext<IBindingSet> context) { + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - final FutureTask<Void> ft = new FutureTask<Void>(new JoinGraphTask(context)); + return new FutureTask<Void>(new JoinGraphTask(context)); - context.getIndexManager().getExecutorService().execute(ft); - - return ft; - } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -320,14 +320,9 @@ } - public Future<Void> eval(final BOpContext<IBindingSet> context) { + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - final FutureTask<Void> ft = new FutureTask<Void>(new JoinTask(this, - context)); - - context.getIndexManager().getExecutorService().execute(ft); - - return ft; + return new FutureTask<Void>(new JoinTask(this, context)); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalBTree.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalBTree.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalBTree.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -1,7 +1,6 @@ package com.bigdata.bop.ndx; import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import com.bigdata.bop.BOpContext; @@ -28,20 +27,15 @@ } - public Future<Void> eval(final BOpContext<E> context) { + public FutureTask<Void> eval(final BOpContext<E> context) { if (context.getPartitionId() != -1) { // Must not be specific to a shard. throw new UnsupportedOperationException(); } - final FutureTask<Void> ft = new FutureTask<Void>( - new LocalBTreeSampleTask(context)); + return new FutureTask<Void>(new LocalBTreeSampleTask(context)); - context.getIndexManager().getExecutorService().execute(ft); - - return ft; - } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalShard.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalShard.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/SampleLocalShard.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -33,20 +33,15 @@ /* * Note: This is done at evaluation time, local to the data. */ - public Future<Void> eval(final BOpContext<E> context) { + public FutureTask<Void> eval(final BOpContext<E> context) { if (context.getPartitionId() == -1) { // Must be specific to a shard. throw new UnsupportedOperationException(); } - final FutureTask<Void> ft = new FutureTask<Void>( - new LocalShardSampleTask(context)); + return new FutureTask<Void>(new LocalShardSampleTask(context)); - context.getIndexManager().getExecutorService().execute(ft); - - return ft; - } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MapBindingSetsOverShards.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MapBindingSetsOverShards.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MapBindingSetsOverShards.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -154,7 +154,7 @@ } - public Future<Void> eval(final BOpContext<IBindingSet> context) { + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { if (context.getFederation() == null) { @@ -169,16 +169,9 @@ } - /* - * Note: The caller's BlockingBuffer is ignored. - */ - final FutureTask<Void> ft = new FutureTask<Void>(new MapShardsTask( - context, sourceOp(), targetPred())); + return new FutureTask<Void>(new MapShardsTask(context, sourceOp(), + targetPred())); - context.getIndexManager().getExecutorService().execute(ft); - - return ft; - } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineDelayOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineDelayOp.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineDelayOp.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -2,9 +2,10 @@ import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import junit.framework.AssertionFailedError; + import com.bigdata.bop.AbstractPipelineOp; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; @@ -12,8 +13,8 @@ import com.bigdata.bop.IBindingSet; /** - * This operator is used to feed the first join in the pipeline. The operator - * should have no children but may be decorated with annotations as necessary. + * Operator block evaluation for the specified {@link Annotations#DELAY} and + * then throws an {@link AssertionFailedError}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -66,23 +67,17 @@ return delay.longValue(); } - /** - * Blocks for the specified {@link Annotations#DELAY}. - */ - public Future<Void> eval(final BOpContext<IBindingSet> context) { + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { final FutureTask<Void> ft = new FutureTask<Void>(new Callable<Void>() { public Void call() throws Exception { Thread.sleep(delayMillis()); - TestQueryEngine.fail(); - return null; + throw new AssertionFailedError(); } }); - context.getIndexManager().getExecutorService().execute(ft); - return ft; } -} \ No newline at end of file +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -59,7 +59,6 @@ import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.resources.ResourceManager; import com.bigdata.service.IBigdataFederation; -import com.bigdata.striterator.IChunkedIterator; import com.bigdata.striterator.ICloseableIterator; /** @@ -429,10 +428,12 @@ private final Map<Integer,BOp> bopIndex; /** - * A collection of the currently executing futures. {@link Future}s are - * added to this collection + * A collection of the currently executing futures. {@link Future}s are + * added to this collection by {@link #newChunkTask(BindingSetChunk)}. + * They are removed when they are {@link Future#isDone()}. + * {@link Future}s are cancelled if the {@link RunningQuery} is halted. */ - private final ConcurrentHashMap<Future<?>,Future<?>> futures = new ConcurrentHashMap<Future<?>, Future<?>>(); + private final ConcurrentHashMap<Future<?>, Future<?>> futures = new ConcurrentHashMap<Future<?>, Future<?>>(); /** * @@ -459,6 +460,19 @@ } /** + * Create a {@link BindingSetChunk} from a sink and add it to the queue. + * + * @param sinkId + * @param sink + * + * @todo In scale-out, this is where we need to map the binding sets + * over the shards for the target operator. + */ + private void add(final int sinkId, final IBlockingBuffer<?> sink) { + throw new UnsupportedOperationException(); + } + + /** * Make a chunk of binding sets available for consumption by the query. * * @param chunk @@ -508,6 +522,9 @@ * nodes). * * @todo If eval of the chunk fails, halt() the query. + * + * @todo evaluation of element[] pipelines might run into type + * problems with the [queryBuffer]. */ final BOp bop = bopIndex.get(chunk.bopId); if (bop == null) { @@ -517,18 +534,47 @@ throw new UnsupportedOperationException(bop.getClass() .getName()); } + // sink + final Integer sinkId = null;// @todo from annotation (it is the parent). final IBlockingBuffer<?> sink = ((PipelineOp<?>) bop).newBuffer(); + // altSink final Integer altSinkId = (Integer) bop .getProperty(BindingSetPipelineOp.Annotations.ALT_SINK_REF); if (altSinkId != null && !bopIndex.containsKey(altSinkId)) { throw new NoSuchBOpException(altSinkId); } - final IBlockingBuffer<?> sink2 = altSinkId == null ? null + final IBlockingBuffer<?> altSink = altSinkId == null ? null : ((PipelineOp<?>) bop).newBuffer(); + // context final BOpContext context = new BOpContext(queryEngine.fed, queryEngine.indexManager, readTimestamp, writeTimestamp, - chunk.partitionId, stats, chunk.source, sink, sink2); - return ((PipelineOp)bop).eval(context); + chunk.partitionId, stats, chunk.source, sink, altSink); + // FutureTask for operator execution (not running yet). + final FutureTask<Void> f = ((PipelineOp)bop).eval(context); + // Hook the FutureTask. + final Runnable r = new Runnable() { + public void run() { + try { + f.run(); // run + f.get(); // verify success + add(sinkId, sink); // handle output chunk. + if (altSink != null) // handle alt sink output chunk. + add(altSinkId, altSink); + } catch (Throwable t) { + // operator failed on this chunk. + RunningQuery.this + .cancel(true/* mayInterruptIfRunning */); + log.error("queryId=" + queryId + ",bopId=" + + chunk.bopId + ",partitionId=" + + chunk.partitionId + " : " + t); + } + } + }; + // wrap runnable. + final FutureTask<Void> f2 = new FutureTask(r, null/* result */); + // add to list of active futures. + futures.put(f2, f2); + return f; } /** @@ -541,8 +587,8 @@ * @todo Do all queries produce solutions (mutation operations might * return a mutation count, but they do not return solutions). */ - public IChunkedIterator<IBindingSet> iterator() { - throw new UnsupportedOperationException(); + public IAsynchronousIterator<IBindingSet[]> iterator() { + return queryBuffer.iterator(); } /* @@ -553,7 +599,10 @@ * query. */ - final public boolean cancel(boolean mayInterruptIfRunning) { + final public boolean cancel(final boolean mayInterruptIfRunning) { + for (Future<?> f : futures.keySet()) { + f.cancel(mayInterruptIfRunning); + } return future.cancel(mayInterruptIfRunning); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/ReceiveBindingSets.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/ReceiveBindingSets.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/ReceiveBindingSets.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -28,7 +28,7 @@ package com.bigdata.bop.engine; import java.util.Map; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.apache.log4j.Logger; @@ -74,7 +74,7 @@ } - public Future<Void> eval(final BOpContext<IBindingSet> context) { + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { if (context.getFederation() == null) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -247,7 +247,7 @@ assertEquals(1L, stats.chunksOut.get()); // Verify no results. - final IChunkedIterator<IBindingSet> itr = runningQuery.iterator(); + final IAsynchronousIterator<IBindingSet[]> itr = runningQuery.iterator(); try { if (itr.hasNext()) fail("Not expecting any solutions"); @@ -323,14 +323,18 @@ -1, //partitionId newBindingSetIterator(EmptyBindingSet.INSTANCE))); - final IChunkedIterator<IBindingSet> itr = runningQuery.iterator(); + final IAsynchronousIterator<IBindingSet[]> itr = runningQuery.iterator(); try { int n = 0; while (itr.hasNext()) { - final IBindingSet e = itr.next(); + final IBindingSet[] e = itr.next(); if (log.isInfoEnabled()) - log.info(n + " : " + e); - assertEquals(expected[n], e); + log.info(n + " : chunkSize=" + e.length); + for (int i = 0; i < e.length; i++) { + if (log.isInfoEnabled()) + log.info(n + " : " + e[i]); + assertEquals(expected[n], e[i]); + } n++; } } finally { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-08-27 14:28:04 UTC (rev 3466) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-08-27 14:57:24 UTC (rev 3467) @@ -28,6 +28,8 @@ package com.bigdata.bop.join; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import junit.framework.TestCase2; @@ -189,8 +191,10 @@ * (T2, B3) // T2:(Paul loves Leon) with B3:[A=Leon, B=Paul, ...]. * (T3, B2) // T3:(Leon loves Leon) with T2:[A=Paul, B=Leon, ...]. * </pre> + * @throws ExecutionException + * @throws InterruptedException */ - public void test_pipelineJoin() { + public void test_pipelineJoin() throws InterruptedException, ExecutionException { final int startId = 1; final int joinId = 2; @@ -241,7 +245,11 @@ ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, source, sink, null/* sink2 */); - query.eval(context); + // get task. + final FutureTask<Void> ft = query.eval(context); + + // execute task. + jnl.getExecutorService().execute(ft); final IAsynchronousIterator<IBindingSet[]> itr = sink.iterator(); try { @@ -268,6 +276,10 @@ assertEquals(1L, stats.accessPathCount.get()); assertEquals(1L, stats.chunkCount.get()); assertEquals(1L, stats.elementCount.get()); + + assertTrue(ft.isDone()); + assertFalse(ft.isCancelled()); + ft.get(); // verify nothing thrown. } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |