From: <tho...@us...> - 2010-09-17 10:44:08
|
Revision: 3577 http://bigdata.svn.sourceforge.net/bigdata/?rev=3577&view=rev Author: thompsonbry Date: 2010-09-17 10:44:01 +0000 (Fri, 17 Sep 2010) Log Message: ----------- fixed SliceOp and added a concurrent stress test. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-17 10:19:27 UTC (rev 3576) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-17 10:44:01 UTC (rev 3577) @@ -27,6 +27,7 @@ package com.bigdata.bop.solutions; +import java.math.BigInteger; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -40,6 +41,7 @@ import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -65,20 +67,18 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * - * @todo unit test with stress test for concurrent {@link SliceOp} invocations - * against a streaming chunk producer. + * @todo Unit test with stress test for concurrent {@link SliceOp} invocations + * against a streaming chunk producer. Make sure that the same + * {@link SliceStats} are used for each concurrent invocation of the same + * query. * - * @todo If this operator is invoked for each chunk output by a query onto the - * pipeline then it will over produce unless (A) it is given the same - * {@link BOpStats} each time; and (B) it is not invoked for two chunks - * concurrently. - * <p> - * A safer way to impose the slice constraint is by wrapping the query - * buffer on the query controller. Once the slice is satisfied, it can - * just cancel the query. The only drawback of this approach is that the - * wrapping a buffer is not really the same as applying a {@link BOp} to - * the pipeline so it falls outside of the standard operator evaluation - * logic. + * @todo What is sufficient serialization to make SLICE(ORDER_BY(...)) stable? + * The {@link SortOp} will impose a total ordering and will know how to + * deliver that total ordering to another operator. The {@link SliceOp} + * needs to accept the chunks from the {@link SortOp} in the order in + * which they were sent. This should work as long as we do not reorder the + * chunks for a given operator in the {@link QueryEngine} when they are + * received by the query controller. * * @todo If we allow complex operator trees in which "subqueries" can also use a * slice then either then need to run as their own query with their own @@ -175,8 +175,10 @@ */ private static final long serialVersionUID = 1L; + /** #of solutions visited. */ public final AtomicLong nseen = new AtomicLong(); + /** #of solutions accepted. */ public final AtomicLong naccepted = new AtomicLong(); @Override @@ -235,6 +237,8 @@ /** #of solutions to accept. */ private final long limit; + private final long last; + // /** #of solutions visited. */ // private long nseen; // @@ -261,6 +265,11 @@ this.stats = (SliceStats) context.getStats(); +// this.last = offset + limit; + this.last = BigInteger.valueOf(offset).add( + BigInteger.valueOf(limit)).min( + BigInteger.valueOf(Long.MAX_VALUE)).longValue(); + } public Void call() throws Exception { @@ -277,96 +286,175 @@ try { - // buffer forms chunks which get flushed onto the sink. + /* + * buffer forms chunks which get flushed onto the sink. + * + * @todo if we have visibility into the #of source chunks, then + * do not buffer more than min(#source,#needed). + */ final UnsynchronizedArrayBuffer<IBindingSet> out = new UnsynchronizedArrayBuffer<IBindingSet>( sink, op.getChunkCapacity()); boolean halt = false; - while (source.hasNext()) { + while (source.hasNext() && !halt) { + final IBindingSet[] chunk = source.next(); + /* - * @todo batch each chunk through a lock for better - * concurrency (avoids CAS contention). + * Batch each chunk through a lock for better concurrency + * (avoids CAS contention). + * + * Note: This is safe because the source chunk is already + * materialized and the sink will not block (that is part of + * the bop evaluation contract). */ - final IBindingSet[] chunk = source.next(); - - stats.chunksIn.increment(); - - for (int i = 0; i < chunk.length; i++) { - - stats.unitsIn.increment(); - - if (stats.nseen.incrementAndGet() <= offset) { - // skip solution. - if(log.isTraceEnabled()) - log.trace(toString()); - continue; - } - - final IBindingSet bset = chunk[i]; + synchronized (stats) { - if (out.add2(bset)) { - // chunk was output. -// stats.chunksOut.increment(); - } + if (handleChunk(out, chunk)) { - if(log.isTraceEnabled()) - log.trace(toString() + ":" + bset); + halt = true; -// stats.unitsOut.increment(); - - if (stats.naccepted.incrementAndGet() >= limit) { - if (!out.isEmpty()) { - out.flush(); -// stats.chunksOut.increment(); - } - halt = true; - break; } } - + } - out.flush(); + if (!out.isEmpty()) + out.flush(); + sink.flush(); -// stats.chunksOut.increment(); if (halt) throw new InterruptedException(); -// cancelQuery(); - + // cancelQuery(); + return null; - + } finally { - + sink.close(); - + } } -// /** -// * Cancel the query evaluation. This is invoked when the slice has been -// * satisfied. At that point we want to halt not only the {@link SliceOp} -// * but also the entire query since it does not need to produce any more -// * results. -// */ -// private void cancelQuery() { -// -// context.halt(); -// -// } + /** + * <p> + * Apply the slice semantics to a chunk of binding sets. + * </p> + * <h2>example</h2> + * <p> + * offset=2, limit=3, last=3+2=5. The number line represents the + * observed binding sets. The first binding set is at index ZERO (0). + * The initial conditions are: nseen(S)=0 and naccepted(A)=0. S is + * placed beneath each observation and paired with the value of A for + * that observation. The offset is satisfied when S=2 and observation + * ONE (1) is the first observation accepted. The limit is satisfied + * when A=3, which occurs at observation FOUR (4) which is also + * S=last=5. The observation on which the limit is satisfied is accepted + * and the slice halts as no more observations should be made. {2,3,4} + * are accepted. + * </p> + * + * <pre> + * 0 1 2 3 4 5 6 7 8 9 + * S=1, A=0 + * </pre> + * + * <pre> + * 0 1 2 3 4 5 6 7 8 9 + * S=2, A=0 + * </pre> + * + * <pre> + * 0 1 2 3 4 5 6 7 8 9 + * S=3, A=1 {2} + * </pre> + * + * <pre> + * 0 1 2 3 4 5 6 7 8 9 + * S=4, A=2 {2,3} + * </pre> + * + * <pre> + * 0 1 2 3 4 5 6 7 8 9 + * S=5, A=3 {2,3,4} + * </pre> + * <p> + * Note: The caller MUST be synchronized on the <em>shared</em> + * {@link SliceStats} in order for the decision process to be thread + * safe. + * + * @param chunk + * The chunk of binding sets. + * + * @return <code>true</code> if the slice is satisfied and the query + * should halt. + */ + private boolean handleChunk( + final UnsynchronizedArrayBuffer<IBindingSet> out, + final IBindingSet[] chunk) { + stats.chunksIn.increment(); + + for (int i = 0; i < chunk.length; i++) { + + if (stats.naccepted.get() >= limit) + return true; // nothing more will be accepted. + + stats.unitsIn.increment(); + + final long S = stats.nseen.incrementAndGet(); + + if (S <= offset) + continue; // skip solution. + + final long A = stats.naccepted.get(); + + if (A < limit) { + + final IBindingSet bset = chunk[i]; + + out.add(bset); + + stats.naccepted.incrementAndGet(); + + if (log.isTraceEnabled()) + log.trace(toString() + ":" + bset); + + } + + } // next bindingSet + + return false; + + } + + // /** + // * Cancel the query evaluation. This is invoked when the slice has + // been + // * satisfied. At that point we want to halt not only the {@link + // SliceOp} + // * but also the entire query since it does not need to produce any + // more + // * results. + // */ + // private void cancelQuery() { + // + // context.halt(); + // + // } + public String toString() { return getClass().getName() + "{offset=" + offset + ",limit=" + limit + ",nseen=" + stats.nseen + ",naccepted=" + stats.naccepted + "}"; - + } - + } /** @@ -374,9 +462,9 @@ */ @Override public BOpEvaluationContext getEvaluationContext() { - + return BOpEvaluationContext.CONTROLLER; - + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-17 10:19:27 UTC (rev 3576) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-17 10:44:01 UTC (rev 3577) @@ -28,8 +28,16 @@ package com.bigdata.bop.solutions; import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import junit.framework.TestCase2; @@ -37,6 +45,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.Constant; +import com.bigdata.bop.EmptyBindingSet; import com.bigdata.bop.HashBindingSet; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; @@ -44,8 +53,11 @@ import com.bigdata.bop.NV; import com.bigdata.bop.Var; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.MockRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; +import com.bigdata.bop.solutions.SliceOp.SliceStats; +import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; @@ -142,6 +154,113 @@ * @throws ExecutionException * @throws InterruptedException */ + public void test_slice_offset2_limit3() throws InterruptedException, + ExecutionException { + + final Var<?> x = Var.var("x"); + final Var<?> y = Var.var("y"); + + final int bopId = 1; + + final long offset = 2L; + final long limit = 3L; + + final SliceOp query = new SliceOp(new BOp[]{}, + NV.asMap(new NV[]{// + new NV(SliceOp.Annotations.BOP_ID, bopId),// + new NV(SliceOp.Annotations.OFFSET, offset),// + new NV(SliceOp.Annotations.LIMIT, limit),// + })); + + assertEquals("offset", offset, query.getOffset()); + + assertEquals("limit", limit, query.getLimit()); + + // the expected solutions + final IBindingSet[] expected = new IBindingSet[] {// +// new ArrayBindingSet(// +// new IVariable[] { x, y },// +// new IConstant[] { new Constant<String>("John"), +// new Constant<String>("Mary"), }// +// ), +// new ArrayBindingSet(// +// new IVariable[] { x, y },// +// new IConstant[] { new Constant<String>("Mary"), +// new Constant<String>("Paul"), }// +// ), + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Jane") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("John") }// + ), +// new ArrayBindingSet(// +// new IVariable[] { x, y },// +// new IConstant[] { new Constant<String>("Leon"), +// new Constant<String>("Paul") }// +// ), + }; + + final SliceStats stats = query.newStats(); + + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { data.toArray(new IBindingSet[0]) }); + + final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(stats); + + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + new MockRunningQuery(null/* fed */, null/* indexManager */ + ), -1/* partitionId */, stats, + source, sink, null/* sink2 */); + + // get task. + final FutureTask<Void> ft = query.eval(context); + + ft.run(); + + TestQueryEngine.assertSameSolutions(expected, sink.iterator()); + + assertTrue(ft.isDone()); + assertFalse(ft.isCancelled()); + try { + ft.get(); // verify nothing thrown. + fail("Expecting inner cause : " + InterruptedException.class); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + if (log.isInfoEnabled()) + log.info("Ignoring expected exception: " + t, t); + } else { + fail("Expecting inner cause : " + InterruptedException.class); + } + } + + // check the slice stats first. + assertEquals(limit, stats.naccepted.get()); + assertEquals(offset+limit, stats.nseen.get()); + + // then the general purpose bop stats (less critical). + assertEquals(1L, stats.chunksIn.get()); + assertEquals(offset+limit, stats.unitsIn.get()); + assertEquals(limit, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + + } + + /** + * Unit test for correct visitation for a variety of offset/limit values. + * + * @throws ExecutionException + * @throws InterruptedException + */ public void test_slice_offset1_limit3() throws InterruptedException, ExecutionException { @@ -149,17 +268,20 @@ final Var<?> y = Var.var("y"); final int bopId = 1; + + final long offset = 1; + final long limit = 3; final SliceOp query = new SliceOp(new BOp[]{}, NV.asMap(new NV[]{// new NV(SliceOp.Annotations.BOP_ID, bopId),// - new NV(SliceOp.Annotations.OFFSET, 1L),// - new NV(SliceOp.Annotations.LIMIT, 3L),// + new NV(SliceOp.Annotations.OFFSET, offset),// + new NV(SliceOp.Annotations.LIMIT, limit),// })); - assertEquals("offset", 1L, query.getOffset()); + assertEquals("offset", offset, query.getOffset()); - assertEquals("limit", 3L, query.getLimit()); + assertEquals("limit", limit, query.getLimit()); // the expected solutions final IBindingSet[] expected = new IBindingSet[] {// @@ -179,7 +301,7 @@ new Constant<String>("Leon") }// ), }; - final BOpStats stats = query.newStats(); + final SliceStats stats = query.newStats(); final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( new IBindingSet[][] { data.toArray(new IBindingSet[0]) }); @@ -212,6 +334,9 @@ } } + assertEquals(limit, stats.naccepted.get()); + assertEquals(offset+limit, stats.nseen.get()); + assertEquals(1L, stats.chunksIn.get()); assertEquals(4L, stats.unitsIn.get()); assertEquals(3L, stats.unitsOut.get()); @@ -222,9 +347,6 @@ public void test_slice_offset0_limitAll() throws InterruptedException, ExecutionException { - final Var<?> x = Var.var("x"); - final Var<?> y = Var.var("y"); - final int bopId = 1; final SliceOp query = new SliceOp(new BOp[] {}, NV.asMap(new NV[] {// @@ -240,7 +362,7 @@ // the expected solutions final IBindingSet[] expected = data.toArray(new IBindingSet[0]); - final BOpStats stats = query.newStats(); + final SliceStats stats = query.newStats(); final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( new IBindingSet[][] { data.toArray(new IBindingSet[0]) }); @@ -266,6 +388,8 @@ assertEquals(6L, stats.unitsIn.get()); assertEquals(6L, stats.unitsOut.get()); assertEquals(1L, stats.chunksOut.get()); + assertEquals(6L, stats.nseen.get()); + assertEquals(6L, stats.naccepted.get()); } @@ -342,4 +466,146 @@ } + public void test_slice_threadSafe() throws Exception { + + final long timeout = 10000; // ms + + final int ntrials = 10000; + + final int poolSize = 10; + + doStressTest(500L/* offset */, 1500L/* limit */, timeout, ntrials, + poolSize); + + } + + /** + * + * @param timeout + * @param ntrials + * @param poolSize + * + * @return The #of successful trials. + * + * @throws Exception + */ + protected int doStressTest(final long offset, final long limit, + final long timeout, final int ntrials, final int poolSize) + throws Exception { + + final IBindingSet[][] chunks = new IBindingSet[ntrials][]; + { + final Random r = new Random(); + final IBindingSet bset = EmptyBindingSet.INSTANCE; + for (int i = 0; i < chunks.length; i++) { + // random non-zero chunk size + chunks[i] = new IBindingSet[r.nextInt(10) + 1]; + for (int j = 0; j < chunks[i].length; j++) { + chunks[i][j] = bset; + } + } + } + final int bopId = 1; + final SliceOp query = new SliceOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, bopId),// + new NV(SliceOp.Annotations.OFFSET, offset),// + new NV(SliceOp.Annotations.LIMIT, limit),// + })); + + final SliceStats stats = query.newStats(); + + final IRunningQuery q = new MockRunningQuery(null/* fed */, null/* indexManager */); + + // start time in nanos. + final long begin = System.nanoTime(); + + // timeout in nanos. + final long nanos = TimeUnit.MILLISECONDS.toNanos(timeout); + + final ThreadPoolExecutor service = (ThreadPoolExecutor) Executors + .newFixedThreadPool(poolSize); + + try { + + service.prestartAllCoreThreads(); + + final List<FutureTask<Void>> futures = new LinkedList<FutureTask<Void>>(); + + for (int i = 0; i < ntrials; i++) { + + final IBindingSet[] chunk = chunks[i]; + + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { chunk }); + + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + q, -1/* partitionId */, stats, source, + new BlockingBuffer<IBindingSet[]>(chunk.length), null/* sink2 */); + + final FutureTask<Void> ft = query.eval(context); + + futures.add(ft); + + service.execute(ft); + + } + + int nerror = 0; + int ncancel = 0; + int ntimeout = 0; + int nsuccess = 0; + int ninterrupt = 0; + for (FutureTask<Void> ft : futures) { + // remaining nanoseconds. + final long remaining = nanos - (System.nanoTime() - begin); + if (remaining <= 0) + ft.cancel(true/* mayInterruptIfRunning */); + try { + ft.get(remaining, TimeUnit.NANOSECONDS); + nsuccess++; + } catch (CancellationException ex) { + ncancel++; + } catch (TimeoutException ex) { + ntimeout++; + } catch (ExecutionException ex) { + if (InnerCause.isInnerCause(ex, InterruptedException.class)) { + ninterrupt++; + } else { + log.error(ex, ex); + nerror++; + } + } + } + + final long nseen = stats.nseen.get(); + + final long naccepted = stats.naccepted.get(); + + final long nexpected = limit; + + final String msg = "offset=" + offset + ", limit=" + limit + + ", nseen=" + nseen + ",naccepted=" + naccepted + + ", nexpected=" + nexpected + ", nerror=" + nerror + + ", ncancel=" + ncancel + ", ntimeout=" + ntimeout + + ", ninterrupt=" + ninterrupt + ", nsuccess=" + nsuccess; + + System.err.println(getClass().getName() + "." + getName() + " : " + + msg); + + if (nerror > 0) + fail(msg); + + if (nexpected != naccepted) + fail(msg); + + return nsuccess; + + } finally { + + service.shutdownNow(); + + } + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |