From: <tho...@us...> - 2010-09-16 20:29:58
|
Revision: 3574 http://bigdata.svn.sourceforge.net/bigdata/?rev=3574&view=rev Author: thompsonbry Date: 2010-09-16 20:29:51 +0000 (Thu, 16 Sep 2010) Log Message: ----------- Added a unit test for the query engine in which an IConstraint is applied. Identified a problem with multiple concurrent evaluation of SliceOp. It needs to be modified to use the same state for each invocation and to use CATs (or AtomicLong or chunk-wise locking, or bop invocation wise locking) to prevent concurrency failures (such as letting through too many solutions). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 19:43:08 UTC (rev 3573) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 20:29:51 UTC (rev 3574) @@ -638,6 +638,14 @@ } + if (log.isTraceEnabled()) { + + log.debug("Accepted by " + + constraint.getClass().getSimpleName() + " : " + + bindingSet); + + } + } return true; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java 2010-09-16 19:43:08 UTC (rev 3573) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java 2010-09-16 20:29:51 UTC (rev 3574) @@ -70,15 +70,21 @@ } - public boolean accept(final IBindingSet s) { + public boolean accept(final IBindingSet bset) { + final IVariable<?> var = (IVariable<?>) get(0)/* var */; + // get binding for the variable. - final IConstant<?> tmp = s.get((IVariable<?>) get(0)/* var */); + final IConstant<?> asBound = bset.get(var); - if (tmp == null) + if (asBound == null) return true; // not yet bound. - return tmp.equals(get(1)); + final IConstant<?> cnst = (IConstant<?>) get(1); + + final boolean ret = asBound.equals(cnst); + + return ret; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-16 19:43:08 UTC (rev 3573) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-16 20:29:51 UTC (rev 3574) @@ -1730,10 +1730,14 @@ } if (log.isDebugEnabled()) - log.debug("Accepted element for " + naccepted - + " of " + bindingSets.length - + " possible bindingSet combinations: " - + e.toString() + ", joinOp=" + joinOp); + if (naccepted == 0) { + log.debug("Rejected element: " + e.toString()); + } else { + log.debug("Accepted element for " + naccepted + + " of " + bindingSets.length + + " possible bindingSet combinations: " + + e.toString()); + } } // if something is accepted in the chunk return true. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-16 19:43:08 UTC (rev 3573) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-16 20:29:51 UTC (rev 3574) @@ -50,6 +50,7 @@ import com.bigdata.bop.HashBindingSet; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; +import com.bigdata.bop.IConstraint; import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; @@ -59,6 +60,7 @@ import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.ConditionalRoutingOp; import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.fed.TestFederatedQueryEngine; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.solutions.SliceOp; @@ -68,6 +70,7 @@ import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.Dechunkerator; import com.bigdata.striterator.ICloseableIterator; import com.bigdata.util.concurrent.LatchedExecutor; import com.ibm.icu.impl.ByteBuffer; @@ -515,6 +518,173 @@ } /** + * A join with an {@link IConstraint}. + */ + public void test_query_join_withConstraint() throws Exception { + + final Var<?> x = Var.var("x"); + final Var<?> y = Var.var("y"); + + final int startId = 1; + final int joinId = 2; + final int predId = 3; + final int sliceId = 4; + + final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + + /* + * + * Note: Since the index on which this reads is formed as (column1 + + * column2) the probe key will be [null] if it does not bind the first + * column. Therefore, in order to have the 2nd column constraint we have + * to model it as an IElementFilter on the predicate. + */ + final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] { + x, y}, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.PARTITION_ID, Integer + .valueOf(-1)),// + new NV(Predicate.Annotations.OPTIONAL, Boolean.FALSE),// + new NV(Predicate.Annotations.CONSTRAINT,null),// + new NV(Predicate.Annotations.EXPANDER, null),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + new NV(Predicate.Annotations.KEY_ORDER, + R.primaryKeyOrder),// + })); + + final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, + predOp/* right */, + // join annotations + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId),// + // impose constraint on the join. + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new EQConstant(y, + new Constant<String>("Paul")) }),// + })// + ); + + final BindingSetPipelineOp query = new SliceOp(new BOp[] { joinOp }, + // slice annotations + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, sliceId),// + })// + ); + + // the expected solutions (order is not reliable due to concurrency). + final IBindingSet[] expected = new IBindingSet[] {// +// new ArrayBindingSet(// +// new IVariable[] { x, y },// +// new IConstant[] { new Constant<String>("John"), +// new Constant<String>("Mary") }// +// ), // + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ), // +// new ArrayBindingSet(// +// new IVariable[] { x, y },// +// new IConstant[] { new Constant<String>("Mary"), +// new Constant<String>("John") }// +// ), // + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Paul") }// + ), // +// new ArrayBindingSet(// +// new IVariable[] { x, y },// +// new IConstant[] { new Constant<String>("Paul"), +// new Constant<String>("Leon") }// +// ), // + }; +// new E("John", "Mary"),// [0] +// new E("Leon", "Paul"),// [1] +// new E("Mary", "Paul"),// [2] +// new E("Paul", "Leon"),// [3] + + final RunningQuery runningQuery; + { + final IBindingSet initialBindingSet = new HashBindingSet(); + +// initialBindingSet.set(y, new Constant<String>("Paul")); + + final UUID queryId = UUID.randomUUID(); + + runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId,// + -1, /* partitionId */ + newBindingSetIterator(initialBindingSet))); + } + + // verify solutions. + TestQueryEngine.assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(3, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: "+stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the join operator. + { + final BOpStats stats = statsMap.get(joinId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join : "+stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(2L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the slice operator. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: "+stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(2L, stats.unitsIn.get()); + assertEquals(2L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + } + + /** * @todo Test the ability run a query reading on an access path using a * element filter (other than DISTINCT). */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 19:43:08 UTC (rev 3573) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 20:29:51 UTC (rev 3574) @@ -44,6 +44,7 @@ import com.bigdata.bop.HashBindingSet; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; +import com.bigdata.bop.IConstraint; import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; @@ -52,6 +53,7 @@ import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.LocalChunkMessage; @@ -63,6 +65,7 @@ import com.bigdata.bop.solutions.SliceOp; import com.bigdata.bop.solutions.SortOp; import com.bigdata.btree.keys.KeyBuilder; +import com.bigdata.counters.CAT; import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; @@ -331,6 +334,21 @@ } /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSets + * the binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[] bindingSets) { + + return new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { bindingSets }); + + } + + /** * Starts and stops the {@link QueryEngine}, but does not validate the * semantics of shutdown() versus shutdownNow() since we need to be * evaluating query mixes in order to verify the semantics of those @@ -404,6 +422,92 @@ } /** + * Unit test uses a {@link StartOp} to copy some binding sets through a + * {@link SliceOp} without involving any joins or access path reads. For + * this test, the binding sets never leave the query controller. + * + * @throws Exception + */ + public void test_query_startThenSlice() throws Exception { + + final int startId = 1; + final int sliceId = 4; + + final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + + final BindingSetPipelineOp query = new SliceOp(new BOp[] { startOp }, + // slice annotations + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, sliceId),// + })// + ); + + // the expected solutions (order is not reliable due to concurrency). + final IBindingSet[] expected = new IBindingSet[] {// + new ArrayBindingSet(// + new IVariable[] { Var.var("value") },// + new IConstant[] { new Constant<String>("Paul") }// + ), // + new ArrayBindingSet(// + new IVariable[] { Var.var("value") },// + new IConstant[] { new Constant<String>("John") }// + ) }; + + final UUID queryId = UUID.randomUUID(); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId,// + -1, /* partitionId */ + newBindingSetIterator(expected))); + + // verify solutions. + TestQueryEngine.assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(2, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: "+stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals((long) expected.length, stats.unitsIn.get()); + assertEquals((long) expected.length, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the slice operator. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals((long) expected.length, stats.unitsIn.get()); + assertEquals((long) expected.length, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + } + + /** * Test the ability run a simple join. There are three operators. One feeds * an empty binding set[] into the join, another is the predicate for the * access path on which the join will read (it probes the index once for @@ -448,7 +552,7 @@ ); final BindingSetPipelineOp query = new SliceOp(new BOp[] { joinOp }, - // slice annotations + // slice annotations NV.asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, sliceId),// })// @@ -533,16 +637,24 @@ /** * Test the ability run a simple join which is mapped across two index - * partitions. There are three operators. One feeds an empty binding set[] - * in which the 2nd column of the relation is bound into the join, another - * is the predicate for the access path on which the join will read (it will - * read everything since the primary key is on the first column then the - * second column and hence can not be used to select the index partition for - * this access path), and the third is the join itself. + * partitions. * - * @throws Exception + * FIXME This is failing because the {@link SliceOp} is not remembering its + * state across distinct invocations and is cancelling the query as soon as + * it exhausts its input. In order to have correct decision boundaries, + * slice needs to be invoked either once, concurrently if using {@link CAT} + * s, or in a series of presentations otherwise. + * <p> + * The easiest way to fix this is to have {@link SliceOp} specialize the + * {@link BOpStats}s and carry its state there. That will also make it safe + * for concurrent evaluation within the same query, and we will have to + * write a unit test for that. + * <p> + * I am not yet convinced that the problem with the test failure is double + * invocation of {@link SliceOp}. It could also be that we are not invoking + * it the 2nd time. */ - public void test_query_join1_2shards() throws Exception { + public void test_query_join_withConstraint_readsOn2shards() throws Exception { final Var<?> x = Var.var("x"); final Var<?> y = Var.var("y"); @@ -556,15 +668,22 @@ new NV(Predicate.Annotations.BOP_ID, startId),// })); + /* + * + * Note: Since the index on which this reads is formed as (column1 + + * column2) the probe key will be [null] if it does not bind the first + * column. Therefore, in order to have the 2nd column constraint we have + * to model it as an IElementFilter on the predicate. + */ final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] { - x, new Constant<String>("Paul")}, NV + x, y}, NV .asMap(new NV[] {// new NV(Predicate.Annotations.RELATION_NAME, new String[] { namespace }),// new NV(Predicate.Annotations.PARTITION_ID, Integer .valueOf(-1)),// new NV(Predicate.Annotations.OPTIONAL, Boolean.FALSE),// - new NV(Predicate.Annotations.CONSTRAINT, null),// + new NV(Predicate.Annotations.CONSTRAINT,null),// new NV(Predicate.Annotations.EXPANDER, null),// new NV(Predicate.Annotations.BOP_ID, predId),// new NV(Predicate.Annotations.TIMESTAMP, @@ -578,6 +697,10 @@ // join annotations NV.asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, joinId),// + // impose constraint on the join. + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new EQConstant(y, + new Constant<String>("Paul")) }),// })// ); @@ -590,31 +713,16 @@ // the expected solutions (order is not reliable due to concurrency). final IBindingSet[] expected = new IBindingSet[] {// -// new ArrayBindingSet(// -// new IVariable[] { x, y },// -// new IConstant[] { new Constant<String>("John"), -// new Constant<String>("Mary") }// -// ), // new ArrayBindingSet(// new IVariable[] { x, y },// - new IConstant[] { new Constant<String>("John"), + new IConstant[] { new Constant<String>("Leon"), new Constant<String>("Paul") }// ), // -// new ArrayBindingSet(// -// new IVariable[] { x, y },// -// new IConstant[] { new Constant<String>("Mary"), -// new Constant<String>("John") }// -// ), // new ArrayBindingSet(// new IVariable[] { x, y },// new IConstant[] { new Constant<String>("Mary"), new Constant<String>("Paul") }// ), // -// new ArrayBindingSet(// -// new IVariable[] { x, y },// -// new IConstant[] { new Constant<String>("Paul"), -// new Constant<String>("Leon") }// -// ), // }; // // partition0 // new E("John", "Mary"),// @@ -628,7 +736,7 @@ { final IBindingSet initialBindingSet = new HashBindingSet(); - initialBindingSet.set(y, new Constant<String>("Paul")); +// initialBindingSet.set(y, new Constant<String>("Paul")); final UUID queryId = UUID.randomUUID(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |