From: <tho...@us...> - 2010-09-24 13:38:39
|
Revision: 3620 http://bigdata.svn.sourceforge.net/bigdata/?rev=3620&view=rev Author: thompsonbry Date: 2010-09-24 13:38:31 +0000 (Fri, 24 Sep 2010) Log Message: ----------- Fixed a nagging bug when handling multiple small chunks flowing through the pipeline. In the end, it turns out that the problem was the SliceOp. The SliceTask was closing the sink after the first invocation, which was causing the query to be interrupted (cancelled). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -195,7 +195,7 @@ * identifier for the {@link BOp} within the context of its owning * query. */ - String BOP_ID = BOp.class.getName()+".bopId"; + String BOP_ID = BOp.class.getName() + ".bopId"; /** * The timeout for the operator evaluation (milliseconds). @@ -210,8 +210,8 @@ * be interpreted with respect to the time when the query began to * execute. */ - String TIMEOUT = BOp.class.getName()+".timeout"; - + String TIMEOUT = BOp.class.getName() + ".timeout"; + /** * The default timeout for operator evaluation. */ @@ -233,9 +233,9 @@ * @see #TIMESTAMP */ String MUTATION = BOp.class.getName() + ".mutation"; - + boolean DEFAULT_MUTATION = false; - + /** * The timestamp (or transaction identifier) used by this operator if it * reads or writes on the database. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -30,6 +30,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Level; +import org.apache.log4j.Priority; + import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.btree.IRangeQuery; @@ -307,6 +310,22 @@ } + /** + * You can uncomment a line in this method to see who is closing the + * buffer. + * <p> + * {@inheritDoc} + */ + @Override + public void close() { + +// if (isOpen()) +// log.error(toString(), new RuntimeException("STACK TRACE")); + + super.close(); + + } + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -119,7 +119,7 @@ public String toString() { final StringBuilder sb = new StringBuilder(); - sb.append(getClass().getName()); + sb.append(super.toString()); sb.append("{chunksIn=" + chunksIn.get()); sb.append(",unitsIn=" + unitsIn.get()); sb.append(",chunksOut=" + chunksOut.get()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -413,7 +413,17 @@ continue; } } catch (InterruptedException e) { - log.warn("Interrupted."); + /* + * Note: Uncomment the stack trace here if you want to find + * where the query was interrupted. + * + * Note: If you want to find out who interrupted the query, + * then you can instrument BlockingBuffer#close() in + * PipelineOp#newBuffer(stats). + */ + log.warn("Interrupted." +// ,e + ); return; } catch (Throwable ex) { // log and continue Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryResultIterator.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -52,10 +52,11 @@ * Note: DO NOT halt the query here!!!! That will cause it to not * accept any more messages. Just close the source iterator. */ + src.close(); // try { // runningQuery.halt(); // } finally { - src.close(); +// src.close(); // } } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -338,9 +338,6 @@ messagesProduced(msg.getBOpId(), 1/* nmessages */); - if (log.isInfoEnabled()) - log.info(msg.toString()); - if (TableLog.tableLog.isInfoEnabled()) { /* * Note: RunState is only used by the query controller so this will @@ -359,9 +356,12 @@ null/* cause */, null/* stats */)); } - if (debug) - System.err.println("startQ : " + toString()); + if(log.isInfoEnabled()) + log.info("startQ : " + toString()); + if (log.isTraceEnabled()) + log.trace(msg.toString()); + } /** @@ -397,19 +397,18 @@ messagesConsumed(msg.bopId, msg.nmessages); - if (log.isTraceEnabled()) - log.trace(msg.toString()); - if (TableLog.tableLog.isInfoEnabled()) { TableLog.tableLog.info(getTableRow("startOp", msg.serviceId, msg.bopId, msg.partitionId, msg.nmessages/* fanIn */, null/* cause */, null/* stats */)); } - if (debug) - System.err - .println("startOp: " + toString() + " : bop=" + msg.bopId); + if (log.isInfoEnabled()) + log.info("startOp: " + toString() + " : bop=" + msg.bopId); + if (log.isTraceEnabled()) + log.trace(msg.toString()); + return firstTime; } @@ -470,9 +469,6 @@ if (isAllDone) this.allDone.set(true); - if (log.isTraceEnabled()) - log.trace(msg.toString()); - if (TableLog.tableLog.isInfoEnabled()) { final int fanOut = msg.sinkMessagesOut + msg.altSinkMessagesOut; TableLog.tableLog.info(getTableRow("haltOp", msg.serviceId, @@ -480,10 +476,13 @@ msg.taskStats)); } - if (debug) - System.err.println("haltOp : " + toString() + " : bop=" + msg.bopId + if (log.isInfoEnabled()) + log.info("haltOp : " + toString() + " : bop=" + msg.bopId + ",isOpDone=" + isOpDone); + if (log.isTraceEnabled()) + log.trace(msg.toString()); + if (msg.cause != null) { /* Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -275,70 +275,69 @@ public Void call() throws Exception { - if(log.isTraceEnabled()) - log.trace(toString()); - final IAsynchronousIterator<IBindingSet[]> source = context .getSource(); final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); - try { + /* + * buffer forms chunks which get flushed onto the sink. + * + * @todo if we have visibility into the #of source chunks, then do + * not buffer more than min(#source,#needed). + */ + final UnsynchronizedArrayBuffer<IBindingSet> out = new UnsynchronizedArrayBuffer<IBindingSet>( + sink, op.getChunkCapacity()); + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + /* - * buffer forms chunks which get flushed onto the sink. + * Batch each chunk through a lock for better concurrency + * (avoids CAS contention). * - * @todo if we have visibility into the #of source chunks, then - * do not buffer more than min(#source,#needed). + * Note: This is safe because the source chunk is already + * materialized and the sink will not block (that is part of the + * bop evaluation contract). + * + * Note: We need to be careful here with concurrent close of the + * sink (which is the shared queryBuffer) by concurrent + * SliceOps. The problem is that the slice can count off the + * solutions without having them flushed all the way through to + * the queryBuffer, but we can not close the query buffer until + * we actually see the last solution added to the query buffer. + * This is why the slice flushes the buffer while it is + * synchronized. */ - final UnsynchronizedArrayBuffer<IBindingSet> out = new UnsynchronizedArrayBuffer<IBindingSet>( - sink, op.getChunkCapacity()); + synchronized (stats) { - boolean halt = false; - - while (source.hasNext() && !halt) { + if (log.isTraceEnabled()) + log.trace(toString() + ": stats=" + stats + ", sink=" + + sink); - final IBindingSet[] chunk = source.next(); + final boolean halt = handleChunk(out, chunk); - /* - * Batch each chunk through a lock for better concurrency - * (avoids CAS contention). - * - * Note: This is safe because the source chunk is already - * materialized and the sink will not block (that is part of - * the bop evaluation contract). - */ - synchronized (stats) { - - if (handleChunk(out, chunk)) { + if (!out.isEmpty()) + out.flush(); - halt = true; + sink.flush(); - } + if (halt) { - } + if (log.isInfoEnabled()) + log.info("Slice will interrupt query."); - } + context.getRunningQuery().halt(); - if (!out.isEmpty()) - out.flush(); + } - sink.flush(); - - if (halt) { -// log.error("Slice will interrupt query.");// FIXME comment out this line. - context.getRunningQuery().halt();//throw new InterruptedException(); } - // cancelQuery(); - return null; - - } finally { - - sink.close(); - } + return null; + } /** @@ -400,6 +399,8 @@ stats.chunksIn.increment(); +// int nadded = 0; + for (int i = 0; i < chunk.length; i++) { if (stats.naccepted.get() >= limit) @@ -420,6 +421,8 @@ out.add(bset); +// nadded++; + stats.naccepted.incrementAndGet(); if (log.isTraceEnabled()) @@ -428,29 +431,14 @@ } } // next bindingSet - + return false; } - // /** - // * Cancel the query evaluation. This is invoked when the slice has - // been - // * satisfied. At that point we want to halt not only the {@link - // SliceOp} - // * but also the entire query since it does not need to produce any - // more - // * results. - // */ - // private void cancelQuery() { - // - // context.halt(); - // - // } - public String toString() { - return getClass().getName() + "{offset=" + offset + ",limit=" + return super.toString() + "{offset=" + offset + ",limit=" + limit + ",nseen=" + stats.nseen + ",naccepted=" + stats.naccepted + "}"; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -475,7 +475,8 @@ final StringBuilder sb = new StringBuilder(); - sb.append("BlockingBuffer"); + sb.append(super.toString()); +// sb.append("BlockingBuffer"); sb.append("{ open=" + open); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-24 13:38:31 UTC (rev 3620) @@ -91,13 +91,13 @@ log4j.logger.com.bigdata.util.concurrent.Haltable=ALL -log4j.logger.com.bigdata.bop=ALL +#log4j.logger.com.bigdata.bop=ALL #log4j.logger.com.bigdata.bop.join.PipelineJoin=ALL -#log4j.logger.com.bigdata.bop.solutions.SliceOp=ALL +#log4j.logger.com.bigdata.bop.solutions.SliceOp=ALL,destPlain #log4j.logger.com.bigdata.bop.engine=ALL #log4j.logger.com.bigdata.bop.engine.QueryEngine=ALL #log4j.logger.com.bigdata.bop.engine.RunningQuery=ALL -#log4j.logger.com.bigdata.bop.engine.RunState=ALL +log4j.logger.com.bigdata.bop.engine.RunState=INFO #log4j.logger.com.bigdata.bop.engine.RunningQuery$ChunkTask=ALL #log4j.logger.com.bigdata.bop.fed.FederatedQueryEngine=ALL #log4j.logger.com.bigdata.bop.fed.FederatedRunningQuery=ALL @@ -215,6 +215,11 @@ log4j.appender.dest2.layout=org.apache.log4j.PatternLayout log4j.appender.dest2.layout.ConversionPattern=%-5p: %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n +## destPlain +#log4j.appender.destPlain=org.apache.log4j.ConsoleAppender +#log4j.appender.destPlain.layout=org.apache.log4j.PatternLayout +#log4j.appender.destPlain.layout.ConversionPattern= + ## # BOp run state trace (tab delimited file). Uncomment the next line to enable. log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -46,10 +46,6 @@ private final IIndexManager indexManager; -// private final long readTimestamp; -// -// private final long writeTimestamp; - /** * Note: This constructor DOES NOT check its arguments so unit tests may be * written with the minimum dependencies @@ -60,13 +56,10 @@ * @param writeTimestamp */ public MockRunningQuery(final IBigdataFederation<?> fed, - final IIndexManager indexManager/*, final long readTimestamp, - final long writeTimestamp*/) { + final IIndexManager indexManager) { this.fed = fed; this.indexManager = indexManager; -// this.readTimestamp = readTimestamp; -// this.writeTimestamp = writeTimestamp; } @@ -78,14 +71,6 @@ return indexManager; } -// public long getReadTimestamp() { -// return readTimestamp; -// } -// -// public long getWriteTimestamp() { -// return writeTimestamp; -// } - /** * NOP (you have to test things like slices with a full integration). */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -405,6 +405,7 @@ final int startId = 1; final int joinId = 2; final int predId = 3; + final int sliceId = 4; /* * Enforce a constraint on the source such that it hands 3 each source @@ -442,7 +443,6 @@ })// ); - final int sliceId = 4; final SliceOp sliceOp = new SliceOp(new BOp[] { joinOp }, // slice annotations NV.asMap(new NV[] {// @@ -509,6 +509,8 @@ startId, -1 /* partitionId */, newBindingSetIterator(sources))); +// runningQuery.get(); + // verify solutions. assertSameSolutionsAnyOrder(expected, new Dechunkerator<IBindingSet>( runningQuery.iterator())); @@ -686,6 +688,131 @@ fail("write test"); } + + /** + * Unit test runs chunks into a slice without a limit. This verifies that + * the query terminates properly even though the slice is willing to accept + * more data. + * + * @throws Exception + */ + public void test_query_slice_noLimit() throws Exception { + + final Var<?> x = Var.var("x"); + final Var<?> y = Var.var("y"); + + final int startId = 1; + final int sliceId = 2; + + /* + * Enforce a constraint on the source such that it hands 3 each source + * chunk to the join operator as a separate chunk + */ + final int nsources = 4; + final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(PipelineOp.Annotations.CHUNK_CAPACITY, 1),// + new NV(PipelineOp.Annotations.CHUNK_OF_CHUNKS_CAPACITY, nsources),// + new NV(QueryEngineTestAnnotations.ONE_MESSAGE_PER_CHUNK, true),// + })); + + final SliceOp sliceOp = new SliceOp(new BOp[] { startOp }, + // slice annotations + NV.asMap(new NV[] { // + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(SliceOp.Annotations.OFFSET, 0L),// + new NV(SliceOp.Annotations.LIMIT, Long.MAX_VALUE),// + })// + ); + + // the source data. + final IBindingSet[] source = new IBindingSet[] {// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mary") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Mark") }// + )}; + // Put each source binding set into a chunk by itself. + final IBindingSet[][] sources = new IBindingSet[source.length][]; + for (int i = 0; i < sources.length; i++) { + sources[i] = new IBindingSet[] { source[i] }; + } + assertEquals(nsources, source.length); + assertEquals(nsources, sources.length); + + final BindingSetPipelineOp query = sliceOp; + final UUID queryId = UUID.randomUUID(); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1 /* partitionId */, + newBindingSetIterator(sources))); + + // + // + // + + // the expected solutions. + final IBindingSet[] expected = source; + + // verify solutions. + assertSameSolutionsAnyOrder(expected, new Dechunkerator<IBindingSet>( + runningQuery.iterator())); + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(2, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: " + stats.toString()); + + // verify query solution stats details. + assertEquals((long)nsources, stats.chunksIn.get()); + assertEquals((long)nsources, stats.unitsIn.get()); + assertEquals((long)nsources, stats.unitsOut.get()); + assertEquals((long)nsources, stats.chunksOut.get()); + } + + // validate the stats for the slice operator. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. + assertEquals((long)nsources, stats.chunksIn.get()); + assertEquals((long)nsources, stats.unitsIn.get()); + assertEquals((long)nsources, stats.unitsOut.get()); + assertEquals((long)nsources, stats.chunksOut.get()); + } + + } /** * Run a join with a slice. The slice is always evaluated on the query Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-23 20:22:50 UTC (rev 3619) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-24 13:38:31 UTC (rev 3620) @@ -54,13 +54,14 @@ import com.bigdata.bop.Var; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.bop.engine.MockRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; import com.bigdata.bop.solutions.SliceOp.SliceStats; +import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; +import com.bigdata.service.IBigdataFederation; import com.bigdata.util.InnerCause; /** @@ -219,7 +220,7 @@ final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( new MockRunningQuery(null/* fed */, null/* indexManager */ - ), -1/* partitionId */, stats, + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. @@ -311,7 +312,7 @@ final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( new MockRunningQuery(null/* fed */, null/* indexManager */ - ), -1/* partitionId */, stats, + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. @@ -323,18 +324,7 @@ assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); -// try { - ft.get(); // verify nothing thrown. -// fail("Expecting inner cause : " + InterruptedException.class); -// } catch (Throwable t) { -// if (InnerCause.isInnerCause(t, InterruptedException.class)) { -// if (log.isInfoEnabled()) -// log.info("Ignoring expected exception: " + t, t); -// } else { -// fail("Expecting inner cause " + InterruptedException.class -// + ", not " + t, t); -// } -// } + ft.get(); // verify nothing thrown. assertEquals(limit, stats.naccepted.get()); assertEquals(offset+limit, stats.nseen.get()); @@ -346,6 +336,171 @@ } + /** + * Unit test where the offset is never satisfied. For this test, all binding + * sets will be consumed but none will be emitted. + * + * @throws InterruptedException + * @throws ExecutionException + */ + public void test_slice_offsetNeverSatisfied() throws InterruptedException, + ExecutionException { + + final int bopId = 1; + + final long offset = 100L; + final long limit = 3L; + + final SliceOp query = new SliceOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, bopId),// + new NV(SliceOp.Annotations.OFFSET, offset),// + new NV(SliceOp.Annotations.LIMIT, limit),// + })); + + assertEquals("offset", offset, query.getOffset()); + + assertEquals("limit", limit, query.getLimit()); + + // the expected solutions (none) + final IBindingSet[] expected = new IBindingSet[0]; + + final SliceStats stats = query.newStats(); + + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { data.toArray(new IBindingSet[0]) }); + + final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(stats); + + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + new MockRunningQuery(null/* fed */, null/* indexManager */ + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); + + // get task. + final FutureTask<Void> ft = query.eval(context); + + ft.run(); + + /* + * Note: When the slice does not have a limit (or if we write a test + * where the #of source binding sets can not satisfy the offset and/or + * limit) then the sink WILL NOT be closed by the slice. Therefore, in + * order for the iterator to terminate we first check the Future of the + * SliceTask and then _close_ the sink before consuming the iterator. + */ + assertTrue(ft.isDone()); + assertFalse(ft.isCancelled()); + ft.get(); // verify nothing thrown. + sink.close(); // close the sink so the iterator will terminate! + + TestQueryEngine.assertSameSolutions(expected, sink.iterator()); + + assertEquals(1L, stats.chunksIn.get()); + assertEquals(6L, stats.unitsIn.get()); + assertEquals(0L, stats.unitsOut.get()); + assertEquals(0L, stats.chunksOut.get()); + assertEquals(6L, stats.nseen.get()); + assertEquals(0L, stats.naccepted.get()); + + } + + /** + * Unit test where the offset plus the limit is never satisfied. For this + * test, all binding sets will be consumed and some will be emitted, but the + * slice is never satisfied. + * + * @throws InterruptedException + * @throws ExecutionException + */ + public void test_slice_offsetPlusLimitNeverSatisfied() throws InterruptedException, + ExecutionException { + + final Var<?> x = Var.var("x"); + final Var<?> y = Var.var("y"); + + final int bopId = 1; + + final long offset = 2L; + final long limit = 10L; + + final SliceOp query = new SliceOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, bopId),// + new NV(SliceOp.Annotations.OFFSET, offset),// + new NV(SliceOp.Annotations.LIMIT, limit),// + })); + + assertEquals("offset", offset, query.getOffset()); + + assertEquals("limit", limit, query.getLimit()); + + // the expected solutions + final IBindingSet[] expected = new IBindingSet[] {// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Jane") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Leon") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("John") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ),// + }; + + final SliceStats stats = query.newStats(); + + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { data.toArray(new IBindingSet[0]) }); + + final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(stats); + + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + new MockRunningQuery(null/* fed */, null/* indexManager */ + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); + + // get task. + final FutureTask<Void> ft = query.eval(context); + + ft.run(); + + /* + * Note: When the slice does not have a limit (or if we write a test + * where the #of source binding sets can not satisfy the offset and/or + * limit) then the sink WILL NOT be closed by the slice. Therefore, in + * order for the iterator to terminate we first check the Future of the + * SliceTask and then _close_ the sink before consuming the iterator. + */ + assertTrue(ft.isDone()); + assertFalse(ft.isCancelled()); + ft.get(); // verify nothing thrown. + sink.close(); // close the sink so the iterator will terminate! + + TestQueryEngine.assertSameSolutions(expected, sink.iterator()); + + assertEquals(1L, stats.chunksIn.get()); + assertEquals(6L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + assertEquals(6L, stats.nseen.get()); + assertEquals(4L, stats.naccepted.get()); + + } + + /** + * Unit test where the slice accepts everything. + * + * @throws InterruptedException + * @throws ExecutionException + */ public void test_slice_offset0_limitAll() throws InterruptedException, ExecutionException { @@ -353,8 +508,8 @@ final SliceOp query = new SliceOp(new BOp[] {}, NV.asMap(new NV[] {// new NV(SliceOp.Annotations.BOP_ID, bopId),// -// new NV(SliceOp.Annotations.OFFSET, 1L),// -// new NV(SliceOp.Annotations.LIMIT, 3L),// + // new NV(SliceOp.Annotations.OFFSET, 1L),// + // new NV(SliceOp.Annotations.LIMIT, 3L),// })); assertEquals("offset", 0L, query.getOffset()); @@ -362,8 +517,8 @@ assertEquals("limit", Long.MAX_VALUE, query.getLimit()); // the expected solutions - final IBindingSet[] expected = data.toArray(new IBindingSet[0]); - + final IBindingSet[] expected = data.toArray(new IBindingSet[0]); + final SliceStats stats = query.newStats(); final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( @@ -373,19 +528,27 @@ final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( new MockRunningQuery(null/* fed */, null/* indexManager */ - ), -1/* partitionId */, stats, source, sink, null/* sink2 */); + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. final FutureTask<Void> ft = query.eval(context); ft.run(); - TestQueryEngine.assertSameSolutions(expected, sink.iterator()); - + /* + * Note: When the slice does not have a limit (or if we write a test + * where the #of source binding sets can not satisfy the offset and/or + * limit) then the sink WILL NOT be closed by the slice. Therefore, in + * order for the iterator to terminate we first check the Future of the + * SliceTask and then _close_ the sink before consuming the iterator. + */ assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); ft.get(); // verify nothing thrown. + sink.close(); // close the sink so the iterator will terminate! + TestQueryEngine.assertSameSolutions(expected, sink.iterator()); + assertEquals(1L, stats.chunksIn.get()); assertEquals(6L, stats.unitsIn.get()); assertEquals(6L, stats.unitsOut.get()); @@ -395,7 +558,8 @@ } - public void test_slice_correctRejection_badOffset() throws InterruptedException { + public void test_slice_correctRejection_badOffset() + throws InterruptedException { final int bopId = 1; @@ -418,7 +582,7 @@ final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( new MockRunningQuery(null/* fed */, null/* indexManager */ - ), -1/* partitionId */, stats, source, sink, null/* sink2 */); + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. try { @@ -455,7 +619,7 @@ final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( new MockRunningQuery(null/* fed */, null/* indexManager */ - ), -1/* partitionId */, stats, source, sink, null/* sink2 */); + , sink), -1/* partitionId */, stats, source, sink, null/* sink2 */); // get task. try { @@ -516,8 +680,6 @@ final SliceStats stats = query.newStats(); - final IRunningQuery q = new MockRunningQuery(null/* fed */, null/* indexManager */); - // start time in nanos. final long begin = System.nanoTime(); @@ -540,9 +702,14 @@ final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( new IBindingSet[][] { chunk }); + final IBlockingBuffer<IBindingSet[]> sink = new BlockingBuffer<IBindingSet[]>( + chunk.length); + + final IRunningQuery q = new MockRunningQuery(null/* fed */, + null/* indexManager */, sink); + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( - q, -1/* partitionId */, stats, source, - new BlockingBuffer<IBindingSet[]>(chunk.length), null/* sink2 */); + q, -1/* partitionId */, stats, source, sink, null/* sink2 */); final FutureTask<Void> ft = query.eval(context); @@ -610,4 +777,29 @@ } + private static class MockRunningQuery extends + com.bigdata.bop.engine.MockRunningQuery { + + private final IBlockingBuffer<IBindingSet[]> sink; + + public MockRunningQuery(final IBigdataFederation<?> fed, + final IIndexManager indexManager, + final IBlockingBuffer<IBindingSet[]> sink) { + + super(fed, indexManager); + + this.sink = sink; + + } + + /** + * Overridden to close the sink so the slice will terminate. + */ + @Override + public void halt() { + sink.close(); + } + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |