|
From: <tho...@us...> - 2010-09-16 20:29:58
|
Revision: 3574
http://bigdata.svn.sourceforge.net/bigdata/?rev=3574&view=rev
Author: thompsonbry
Date: 2010-09-16 20:29:51 +0000 (Thu, 16 Sep 2010)
Log Message:
-----------
Added a unit test for the query engine in which an IConstraint is applied.
Identified a problem with multiple concurrent evaluation of SliceOp. It needs to be modified to use the same state for each invocation and to use CATs (or AtomicLong or chunk-wise locking, or bop invocation wise locking) to prevent concurrency failures (such as letting through too many solutions).
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 19:43:08 UTC (rev 3573)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 20:29:51 UTC (rev 3574)
@@ -638,6 +638,14 @@
}
+ if (log.isTraceEnabled()) {
+
+ log.debug("Accepted by "
+ + constraint.getClass().getSimpleName() + " : "
+ + bindingSet);
+
+ }
+
}
return true;
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java 2010-09-16 19:43:08 UTC (rev 3573)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/EQConstant.java 2010-09-16 20:29:51 UTC (rev 3574)
@@ -70,15 +70,21 @@
}
- public boolean accept(final IBindingSet s) {
+ public boolean accept(final IBindingSet bset) {
+ final IVariable<?> var = (IVariable<?>) get(0)/* var */;
+
// get binding for the variable.
- final IConstant<?> tmp = s.get((IVariable<?>) get(0)/* var */);
+ final IConstant<?> asBound = bset.get(var);
- if (tmp == null)
+ if (asBound == null)
return true; // not yet bound.
- return tmp.equals(get(1));
+ final IConstant<?> cnst = (IConstant<?>) get(1);
+
+ final boolean ret = asBound.equals(cnst);
+
+ return ret;
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-16 19:43:08 UTC (rev 3573)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-16 20:29:51 UTC (rev 3574)
@@ -1730,10 +1730,14 @@
}
if (log.isDebugEnabled())
- log.debug("Accepted element for " + naccepted
- + " of " + bindingSets.length
- + " possible bindingSet combinations: "
- + e.toString() + ", joinOp=" + joinOp);
+ if (naccepted == 0) {
+ log.debug("Rejected element: " + e.toString());
+ } else {
+ log.debug("Accepted element for " + naccepted
+ + " of " + bindingSets.length
+ + " possible bindingSet combinations: "
+ + e.toString());
+ }
}
// if something is accepted in the chunk return true.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-16 19:43:08 UTC (rev 3573)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-16 20:29:51 UTC (rev 3574)
@@ -50,6 +50,7 @@
import com.bigdata.bop.HashBindingSet;
import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IConstant;
+import com.bigdata.bop.IConstraint;
import com.bigdata.bop.IVariable;
import com.bigdata.bop.IVariableOrConstant;
import com.bigdata.bop.NV;
@@ -59,6 +60,7 @@
import com.bigdata.bop.ap.R;
import com.bigdata.bop.bset.ConditionalRoutingOp;
import com.bigdata.bop.bset.StartOp;
+import com.bigdata.bop.constraint.EQConstant;
import com.bigdata.bop.fed.TestFederatedQueryEngine;
import com.bigdata.bop.join.PipelineJoin;
import com.bigdata.bop.solutions.SliceOp;
@@ -68,6 +70,7 @@
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.ThickAsynchronousIterator;
import com.bigdata.striterator.ChunkedArrayIterator;
+import com.bigdata.striterator.Dechunkerator;
import com.bigdata.striterator.ICloseableIterator;
import com.bigdata.util.concurrent.LatchedExecutor;
import com.ibm.icu.impl.ByteBuffer;
@@ -515,6 +518,173 @@
}
/**
+ * A join with an {@link IConstraint}.
+ */
+ public void test_query_join_withConstraint() throws Exception {
+
+ final Var<?> x = Var.var("x");
+ final Var<?> y = Var.var("y");
+
+ final int startId = 1;
+ final int joinId = 2;
+ final int predId = 3;
+ final int sliceId = 4;
+
+ final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {//
+ new NV(Predicate.Annotations.BOP_ID, startId),//
+ }));
+
+ /*
+ *
+ * Note: Since the index on which this reads is formed as (column1 +
+ * column2) the probe key will be [null] if it does not bind the first
+ * column. Therefore, in order to have the 2nd column constraint we have
+ * to model it as an IElementFilter on the predicate.
+ */
+ final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] {
+ x, y}, NV
+ .asMap(new NV[] {//
+ new NV(Predicate.Annotations.RELATION_NAME,
+ new String[] { namespace }),//
+ new NV(Predicate.Annotations.PARTITION_ID, Integer
+ .valueOf(-1)),//
+ new NV(Predicate.Annotations.OPTIONAL, Boolean.FALSE),//
+ new NV(Predicate.Annotations.CONSTRAINT,null),//
+ new NV(Predicate.Annotations.EXPANDER, null),//
+ new NV(Predicate.Annotations.BOP_ID, predId),//
+ new NV(Predicate.Annotations.TIMESTAMP,
+ ITx.READ_COMMITTED),//
+ new NV(Predicate.Annotations.KEY_ORDER,
+ R.primaryKeyOrder),//
+ }));
+
+ final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */,
+ predOp/* right */,
+ // join annotations
+ NV.asMap(new NV[] {//
+ new NV(Predicate.Annotations.BOP_ID, joinId),//
+ // impose constraint on the join.
+ new NV(PipelineJoin.Annotations.CONSTRAINTS,
+ new IConstraint[] { new EQConstant(y,
+ new Constant<String>("Paul")) }),//
+ })//
+ );
+
+ final BindingSetPipelineOp query = new SliceOp(new BOp[] { joinOp },
+ // slice annotations
+ NV.asMap(new NV[] {//
+ new NV(Predicate.Annotations.BOP_ID, sliceId),//
+ })//
+ );
+
+ // the expected solutions (order is not reliable due to concurrency).
+ final IBindingSet[] expected = new IBindingSet[] {//
+// new ArrayBindingSet(//
+// new IVariable[] { x, y },//
+// new IConstant[] { new Constant<String>("John"),
+// new Constant<String>("Mary") }//
+// ), //
+ new ArrayBindingSet(//
+ new IVariable[] { x, y },//
+ new IConstant[] { new Constant<String>("Leon"),
+ new Constant<String>("Paul") }//
+ ), //
+// new ArrayBindingSet(//
+// new IVariable[] { x, y },//
+// new IConstant[] { new Constant<String>("Mary"),
+// new Constant<String>("John") }//
+// ), //
+ new ArrayBindingSet(//
+ new IVariable[] { x, y },//
+ new IConstant[] { new Constant<String>("Mary"),
+ new Constant<String>("Paul") }//
+ ), //
+// new ArrayBindingSet(//
+// new IVariable[] { x, y },//
+// new IConstant[] { new Constant<String>("Paul"),
+// new Constant<String>("Leon") }//
+// ), //
+ };
+// new E("John", "Mary"),// [0]
+// new E("Leon", "Paul"),// [1]
+// new E("Mary", "Paul"),// [2]
+// new E("Paul", "Leon"),// [3]
+
+ final RunningQuery runningQuery;
+ {
+ final IBindingSet initialBindingSet = new HashBindingSet();
+
+// initialBindingSet.set(y, new Constant<String>("Paul"));
+
+ final UUID queryId = UUID.randomUUID();
+
+ runningQuery = queryEngine.eval(queryId, query,
+ new LocalChunkMessage<IBindingSet>(queryEngine, queryId,
+ startId,//
+ -1, /* partitionId */
+ newBindingSetIterator(initialBindingSet)));
+ }
+
+ // verify solutions.
+ TestQueryEngine.assertSameSolutionsAnyOrder(expected,
+ new Dechunkerator<IBindingSet>(runningQuery.iterator()));
+
+ // Wait until the query is done.
+ runningQuery.get();
+ final Map<Integer, BOpStats> statsMap = runningQuery.getStats();
+ {
+ // validate the stats map.
+ assertNotNull(statsMap);
+ assertEquals(3, statsMap.size());
+ if (log.isInfoEnabled())
+ log.info(statsMap.toString());
+ }
+
+ // validate the stats for the start operator.
+ {
+ final BOpStats stats = statsMap.get(startId);
+ assertNotNull(stats);
+ if (log.isInfoEnabled())
+ log.info("start: "+stats.toString());
+
+ // verify query solution stats details.
+ assertEquals(1L, stats.chunksIn.get());
+ assertEquals(1L, stats.unitsIn.get());
+ assertEquals(1L, stats.unitsOut.get());
+ assertEquals(1L, stats.chunksOut.get());
+ }
+
+ // validate the stats for the join operator.
+ {
+ final BOpStats stats = statsMap.get(joinId);
+ assertNotNull(stats);
+ if (log.isInfoEnabled())
+ log.info("join : "+stats.toString());
+
+ // verify query solution stats details.
+ assertEquals(1L, stats.chunksIn.get());
+ assertEquals(1L, stats.unitsIn.get());
+ assertEquals(2L, stats.unitsOut.get());
+ assertEquals(1L, stats.chunksOut.get());
+ }
+
+ // validate the stats for the slice operator.
+ {
+ final BOpStats stats = statsMap.get(sliceId);
+ assertNotNull(stats);
+ if (log.isInfoEnabled())
+ log.info("slice: "+stats.toString());
+
+ // verify query solution stats details.
+ assertEquals(1L, stats.chunksIn.get());
+ assertEquals(2L, stats.unitsIn.get());
+ assertEquals(2L, stats.unitsOut.get());
+ assertEquals(1L, stats.chunksOut.get());
+ }
+
+ }
+
+ /**
* @todo Test the ability run a query reading on an access path using a
* element filter (other than DISTINCT).
*/
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 19:43:08 UTC (rev 3573)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 20:29:51 UTC (rev 3574)
@@ -44,6 +44,7 @@
import com.bigdata.bop.HashBindingSet;
import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IConstant;
+import com.bigdata.bop.IConstraint;
import com.bigdata.bop.IVariable;
import com.bigdata.bop.IVariableOrConstant;
import com.bigdata.bop.NV;
@@ -52,6 +53,7 @@
import com.bigdata.bop.ap.Predicate;
import com.bigdata.bop.ap.R;
import com.bigdata.bop.bset.StartOp;
+import com.bigdata.bop.constraint.EQConstant;
import com.bigdata.bop.engine.BOpStats;
import com.bigdata.bop.engine.IChunkMessage;
import com.bigdata.bop.engine.LocalChunkMessage;
@@ -63,6 +65,7 @@
import com.bigdata.bop.solutions.SliceOp;
import com.bigdata.bop.solutions.SortOp;
import com.bigdata.btree.keys.KeyBuilder;
+import com.bigdata.counters.CAT;
import com.bigdata.journal.BufferMode;
import com.bigdata.journal.ITx;
import com.bigdata.journal.Journal;
@@ -331,6 +334,21 @@
}
/**
+ * Return an {@link IAsynchronousIterator} that will read a single, chunk
+ * containing all of the specified {@link IBindingSet}s.
+ *
+ * @param bindingSets
+ * the binding sets.
+ */
+ protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator(
+ final IBindingSet[] bindingSets) {
+
+ return new ThickAsynchronousIterator<IBindingSet[]>(
+ new IBindingSet[][] { bindingSets });
+
+ }
+
+ /**
* Starts and stops the {@link QueryEngine}, but does not validate the
* semantics of shutdown() versus shutdownNow() since we need to be
* evaluating query mixes in order to verify the semantics of those
@@ -404,6 +422,92 @@
}
/**
+ * Unit test uses a {@link StartOp} to copy some binding sets through a
+ * {@link SliceOp} without involving any joins or access path reads. For
+ * this test, the binding sets never leave the query controller.
+ *
+ * @throws Exception
+ */
+ public void test_query_startThenSlice() throws Exception {
+
+ final int startId = 1;
+ final int sliceId = 4;
+
+ final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {//
+ new NV(Predicate.Annotations.BOP_ID, startId),//
+ }));
+
+ final BindingSetPipelineOp query = new SliceOp(new BOp[] { startOp },
+ // slice annotations
+ NV.asMap(new NV[] {//
+ new NV(Predicate.Annotations.BOP_ID, sliceId),//
+ })//
+ );
+
+ // the expected solutions (order is not reliable due to concurrency).
+ final IBindingSet[] expected = new IBindingSet[] {//
+ new ArrayBindingSet(//
+ new IVariable[] { Var.var("value") },//
+ new IConstant[] { new Constant<String>("Paul") }//
+ ), //
+ new ArrayBindingSet(//
+ new IVariable[] { Var.var("value") },//
+ new IConstant[] { new Constant<String>("John") }//
+ ) };
+
+ final UUID queryId = UUID.randomUUID();
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ new LocalChunkMessage<IBindingSet>(queryEngine, queryId,
+ startId,//
+ -1, /* partitionId */
+ newBindingSetIterator(expected)));
+
+ // verify solutions.
+ TestQueryEngine.assertSameSolutionsAnyOrder(expected,
+ new Dechunkerator<IBindingSet>(runningQuery.iterator()));
+
+ // Wait until the query is done.
+ runningQuery.get();
+ final Map<Integer, BOpStats> statsMap = runningQuery.getStats();
+ {
+ // validate the stats map.
+ assertNotNull(statsMap);
+ assertEquals(2, statsMap.size());
+ if (log.isInfoEnabled())
+ log.info(statsMap.toString());
+ }
+
+ // validate the stats for the start operator.
+ {
+ final BOpStats stats = statsMap.get(startId);
+ assertNotNull(stats);
+ if (log.isInfoEnabled())
+ log.info("start: "+stats.toString());
+
+ // verify query solution stats details.
+ assertEquals(1L, stats.chunksIn.get());
+ assertEquals((long) expected.length, stats.unitsIn.get());
+ assertEquals((long) expected.length, stats.unitsOut.get());
+ assertEquals(1L, stats.chunksOut.get());
+ }
+
+ // validate the stats for the slice operator.
+ {
+ final BOpStats stats = statsMap.get(sliceId);
+ assertNotNull(stats);
+ if (log.isInfoEnabled())
+ log.info("slice: " + stats.toString());
+
+ // verify query solution stats details.
+ assertEquals(1L, stats.chunksIn.get());
+ assertEquals((long) expected.length, stats.unitsIn.get());
+ assertEquals((long) expected.length, stats.unitsOut.get());
+ assertEquals(1L, stats.chunksOut.get());
+ }
+
+ }
+
+ /**
* Test the ability run a simple join. There are three operators. One feeds
* an empty binding set[] into the join, another is the predicate for the
* access path on which the join will read (it probes the index once for
@@ -448,7 +552,7 @@
);
final BindingSetPipelineOp query = new SliceOp(new BOp[] { joinOp },
- // slice annotations
+ // slice annotations
NV.asMap(new NV[] {//
new NV(Predicate.Annotations.BOP_ID, sliceId),//
})//
@@ -533,16 +637,24 @@
/**
* Test the ability run a simple join which is mapped across two index
- * partitions. There are three operators. One feeds an empty binding set[]
- * in which the 2nd column of the relation is bound into the join, another
- * is the predicate for the access path on which the join will read (it will
- * read everything since the primary key is on the first column then the
- * second column and hence can not be used to select the index partition for
- * this access path), and the third is the join itself.
+ * partitions.
*
- * @throws Exception
+ * FIXME This is failing because the {@link SliceOp} is not remembering its
+ * state across distinct invocations and is cancelling the query as soon as
+ * it exhausts its input. In order to have correct decision boundaries,
+ * slice needs to be invoked either once, concurrently if using {@link CAT}
+ * s, or in a series of presentations otherwise.
+ * <p>
+ * The easiest way to fix this is to have {@link SliceOp} specialize the
+ * {@link BOpStats}s and carry its state there. That will also make it safe
+ * for concurrent evaluation within the same query, and we will have to
+ * write a unit test for that.
+ * <p>
+ * I am not yet convinced that the problem with the test failure is double
+ * invocation of {@link SliceOp}. It could also be that we are not invoking
+ * it the 2nd time.
*/
- public void test_query_join1_2shards() throws Exception {
+ public void test_query_join_withConstraint_readsOn2shards() throws Exception {
final Var<?> x = Var.var("x");
final Var<?> y = Var.var("y");
@@ -556,15 +668,22 @@
new NV(Predicate.Annotations.BOP_ID, startId),//
}));
+ /*
+ *
+ * Note: Since the index on which this reads is formed as (column1 +
+ * column2) the probe key will be [null] if it does not bind the first
+ * column. Therefore, in order to have the 2nd column constraint we have
+ * to model it as an IElementFilter on the predicate.
+ */
final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] {
- x, new Constant<String>("Paul")}, NV
+ x, y}, NV
.asMap(new NV[] {//
new NV(Predicate.Annotations.RELATION_NAME,
new String[] { namespace }),//
new NV(Predicate.Annotations.PARTITION_ID, Integer
.valueOf(-1)),//
new NV(Predicate.Annotations.OPTIONAL, Boolean.FALSE),//
- new NV(Predicate.Annotations.CONSTRAINT, null),//
+ new NV(Predicate.Annotations.CONSTRAINT,null),//
new NV(Predicate.Annotations.EXPANDER, null),//
new NV(Predicate.Annotations.BOP_ID, predId),//
new NV(Predicate.Annotations.TIMESTAMP,
@@ -578,6 +697,10 @@
// join annotations
NV.asMap(new NV[] {//
new NV(Predicate.Annotations.BOP_ID, joinId),//
+ // impose constraint on the join.
+ new NV(PipelineJoin.Annotations.CONSTRAINTS,
+ new IConstraint[] { new EQConstant(y,
+ new Constant<String>("Paul")) }),//
})//
);
@@ -590,31 +713,16 @@
// the expected solutions (order is not reliable due to concurrency).
final IBindingSet[] expected = new IBindingSet[] {//
-// new ArrayBindingSet(//
-// new IVariable[] { x, y },//
-// new IConstant[] { new Constant<String>("John"),
-// new Constant<String>("Mary") }//
-// ), //
new ArrayBindingSet(//
new IVariable[] { x, y },//
- new IConstant[] { new Constant<String>("John"),
+ new IConstant[] { new Constant<String>("Leon"),
new Constant<String>("Paul") }//
), //
-// new ArrayBindingSet(//
-// new IVariable[] { x, y },//
-// new IConstant[] { new Constant<String>("Mary"),
-// new Constant<String>("John") }//
-// ), //
new ArrayBindingSet(//
new IVariable[] { x, y },//
new IConstant[] { new Constant<String>("Mary"),
new Constant<String>("Paul") }//
), //
-// new ArrayBindingSet(//
-// new IVariable[] { x, y },//
-// new IConstant[] { new Constant<String>("Paul"),
-// new Constant<String>("Leon") }//
-// ), //
};
// // partition0
// new E("John", "Mary"),//
@@ -628,7 +736,7 @@
{
final IBindingSet initialBindingSet = new HashBindingSet();
- initialBindingSet.set(y, new Constant<String>("Paul"));
+// initialBindingSet.set(y, new Constant<String>("Paul"));
final UUID queryId = UUID.randomUUID();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|