From: <tho...@us...> - 2010-09-24 14:30:46
|
Revision: 3622 http://bigdata.svn.sourceforge.net/bigdata/?rev=3622&view=rev Author: thompsonbry Date: 2010-09-24 14:30:39 +0000 (Fri, 24 Sep 2010) Log Message: ----------- All implemented unit tests for distributed query now run correctly. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Bundle.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestMapBindingSetsOverShards.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-24 13:51:34 UTC (rev 3621) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-24 14:30:39 UTC (rev 3622) @@ -78,6 +78,12 @@ .getLogger(ChunkTask.class); /** + * Error message used when an operation which must be performed on the query + * controller is attempted on some other {@link IQueryPeer}. + */ + static protected final String ERR_NOT_CONTROLLER = "Operator only permitted on the query controller"; + + /** * The class executing the query on this node. */ final private QueryEngine queryEngine; @@ -1160,8 +1166,10 @@ */ // cancel any running operators for this query on this node. cancelled |= cancelRunningOperators(mayInterruptIfRunning); - // cancel any running operators for this query on other nodes. - cancelled |= cancelQueryOnPeers(future.getCause()); + if (controller) { + // cancel query on other peers. + cancelled |= cancelQueryOnPeers(future.getCause()); + } if (queryBuffer != null) { /* * Close the query buffer so the iterator draining the query @@ -1216,13 +1224,12 @@ return cancelled; } - + /** * Cancel the query on each node where it is known to be running. * <p> * Note: The default implementation verifies that the caller is holding the - * {@link #lock} but is otherwise a NOP. This is overridden for - * scale-out. + * {@link #lock} but is otherwise a NOP. This is overridden for scale-out. * * @param cause * When non-<code>null</code>, the cause. @@ -1230,11 +1237,15 @@ * @return <code>true</code> iff something was cancelled. * * @throws IllegalMonitorStateException - * unless the {@link #lock} is held by the current - * thread. + * unless the {@link #lock} is held by the current thread. + * @throws UnsupportedOperationException + * unless this is the query controller. */ protected boolean cancelQueryOnPeers(final Throwable cause) { + if (!controller) + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); + if (!lock.isHeldByCurrentThread()) throw new IllegalMonitorStateException(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-24 13:51:34 UTC (rev 3621) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-24 14:30:39 UTC (rev 3622) @@ -415,7 +415,7 @@ final BOp targetOp = bopIndex.get(sinkId); - if (bop == null) + if (targetOp == null) throw new IllegalStateException("Not found: " + sinkId); if(log.isTraceEnabled()) @@ -724,12 +724,10 @@ @Override protected boolean cancelQueryOnPeers(final Throwable cause) { - super.cancelQueryOnPeers(cause); + boolean cancelled = super.cancelQueryOnPeers(cause); final UUID queryId = getQueryId(); - boolean cancelled = false; - for (IQueryPeer peer : peers.values()) { try { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Bundle.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Bundle.java 2010-09-24 13:51:34 UTC (rev 3621) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Bundle.java 2010-09-24 14:30:39 UTC (rev 3622) @@ -37,7 +37,7 @@ this.fromKey = keyOrder.getFromKey(keyBuilder, asBound); - this.toKey = keyOrder.getFromKey(keyBuilder, asBound); + this.toKey = keyOrder.getToKey(keyBuilder, asBound); } @@ -94,4 +94,14 @@ private int hash = 0; + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append("{bindingSet="+bindingSet); + sb.append(",asBound="+asBound); + sb.append(",fromKey="+BytesUtil.toString(fromKey)); + sb.append(",toKey="+BytesUtil.toString(toKey)); + sb.append("}"); + return sb.toString(); + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2010-09-24 13:51:34 UTC (rev 3621) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2010-09-24 14:30:39 UTC (rev 3622) @@ -108,7 +108,7 @@ * Note: This is tested later once we have gone through the core unit * tests for the services. */ - //suite.addTest( com.bigdata.bop.fed.TestAll.suite() ); + suite.addTest( com.bigdata.bop.fed.TestAll.suite() ); return suite; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-24 13:51:34 UTC (rev 3621) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-24 14:30:39 UTC (rev 3622) @@ -107,7 +107,10 @@ * @todo reuse the stress tests from {@link TestQueryEngine}. * * @todo verify that the peers notify the query controller when they first - * register + * register + * + * FIXME Write test of an RMI based join (this is used for some default + * graph query patterns). */ public class TestFederatedQueryEngine extends AbstractEmbeddedFederationTestCase { @@ -651,9 +654,9 @@ log.info("join : "+stats.toString()); // verify query solution stats details. - assertEquals(1L, stats.chunksIn.get()); - assertEquals(1L, stats.unitsIn.get()); - assertEquals(5L, stats.unitsOut.get()); + assertEquals(2L, stats.chunksIn.get()); // two shards. + assertEquals(2L, stats.unitsIn.get()); // two shards, one empty bset each. + assertEquals(5L, stats.unitsOut.get()); // total of 5 tuples read across both shards. assertEquals(2L, stats.chunksOut.get()); // since we read on both shards. } @@ -809,9 +812,9 @@ // verify query solution stats details. assertEquals(2L, stats.chunksIn.get()); // since we read on two shards. - assertEquals(1L, stats.unitsIn.get()); // a single empty binding set. - assertEquals(5L, stats.unitsOut.get()); // each of the tuples will be read. - assertEquals(2L, stats.chunksOut.get()); // since we read on both shards. + assertEquals(2L, stats.unitsIn.get()); // a single empty binding set for each. + assertEquals(2L, stats.unitsOut.get()); // one tuple on each shard will satisfy the constraint. + assertEquals(2L, stats.chunksOut.get()); // since we read on both shards and both shards have one tuple which joins. } // validate the stats for the slice operator. @@ -831,12 +834,12 @@ } /** - * Test the ability run a simple join reading on a single shard. There are - * three operators. One feeds an empty binding set[] into the join, another - * is the predicate for the access path on which the join will read (it - * probes the index once for "Mary" and binds "Paul" and "John" when it does - * so), and the third is the join itself (there are two solutions, which are - * "value=Paul" and value="John"). + * Test the ability to run a simple join reading on a single shard. There + * are three operators. One feeds an empty binding set[] into the join, + * another is the predicate for the access path on which the join will read + * (it probes the index once for "Mary" and binds "Paul" and "John" when it + * does so), and the third is the join itself (there are two solutions, + * which are value="Paul" and value="John"). */ public void test_query_join_1shard() throws Exception { @@ -1096,15 +1099,15 @@ // verify solutions. { - // the expected solution (just one). + // the expected solutions. final IBindingSet[] expected = new IBindingSet[] {// - new ArrayBindingSet(// + new ArrayBindingSet(// partition1 new IVariable[] { Var.var("x"), Var.var("y"), Var.var("z") },// new IConstant[] { new Constant<String>("Mary"), new Constant<String>("Paul"), new Constant<String>("Leon") }// ),// - new ArrayBindingSet(// + new ArrayBindingSet(// partition0 new IVariable[] { Var.var("x"), Var.var("y"), Var.var("z") },// new IConstant[] { new Constant<String>("Mary"), new Constant<String>("John"), @@ -1114,6 +1117,13 @@ TestQueryEngine.assertSameSolutionsAnyOrder(expected, new Dechunkerator<IBindingSet>(runningQuery.iterator())); +// // partition0 +// new E("John", "Mary"),// +// new E("Leon", "Paul"),// +// // partition1 +// new E("Mary", "John"),// +// new E("Mary", "Paul"),// +// new E("Paul", "Leon"),// } // Wait until the query is done. @@ -1122,7 +1132,7 @@ { // validate the stats map. assertNotNull(statsMap); - assertEquals(3, statsMap.size()); + assertEquals(4, statsMap.size()); if (log.isInfoEnabled()) log.info(statsMap.toString()); } @@ -1149,10 +1159,10 @@ log.info("join1: " + stats.toString()); // verify query solution stats details. - assertEquals(1L, stats.chunksIn.get()); - assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.chunksIn.get()); // reads only on one shard. + assertEquals(1L, stats.unitsIn.get()); // the initial binding set. assertEquals(2L, stats.unitsOut.get()); - assertEquals(1L, stats.chunksOut.get()); // @todo depends on where the shards are. + assertEquals(1L, stats.chunksOut.get()); // one chunk out, but will be mapped over two shards. } // validate the stats for the 2nd join operator. @@ -1163,10 +1173,10 @@ log.info("join2: " + stats.toString()); // verify query solution stats details. - assertEquals(1L, stats.chunksIn.get()); // @todo depends on where the shards are. - assertEquals(2L, stats.unitsIn.get()); - assertEquals(2L, stats.unitsOut.get()); - assertEquals(1L, stats.chunksOut.get()); // @todo depends on where the shards are. + assertEquals(2L, stats.chunksIn.get()); // one chunk per shard on which we will read. + assertEquals(2L, stats.unitsIn.get()); // one binding set in per shard. + assertEquals(2L, stats.unitsOut.get()); // one solution per shard. + assertEquals(2L, stats.chunksOut.get()); // since join ran on two shards and each had one solution. } // validate stats for the sliceOp (on the query controller) @@ -1177,10 +1187,10 @@ log.info("slice: " + stats.toString()); // verify query solution stats details. - assertEquals(1L, stats.chunksIn.get()); // @todo? - assertEquals(2L, stats.unitsIn.get()); - assertEquals(2L, stats.unitsOut.get()); - assertEquals(1L, stats.chunksOut.get()); // @todo? + assertEquals(2L, stats.chunksIn.get()); // one chunk from each shard of join2 with a solution. + assertEquals(2L, stats.unitsIn.get()); // one solution per shard for join2. + assertEquals(2L, stats.unitsOut.get()); // slice passes all units. + assertEquals(2L, stats.chunksOut.get()); // slice runs twice. } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestMapBindingSetsOverShards.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestMapBindingSetsOverShards.java 2010-09-24 13:51:34 UTC (rev 3621) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestMapBindingSetsOverShards.java 2010-09-24 14:30:39 UTC (rev 3622) @@ -64,11 +64,15 @@ import com.bigdata.striterator.IKeyOrder; /** - * Unit tests for {@link MapBindingSetsOverShardsBuffer}. + * Unit tests for {@link MapBindingSetsOverShardsBuffer}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: TestMapBindingSetsOverShards.java 3448 2010-08-18 20:55:58Z * thompsonbry $ + * + * FIXME More unit tests. It appears that none of these tests cover the + * case where there is a shared prefix, e.g., because at least one + * component of the selected key order is bound. */ public class TestMapBindingSetsOverShards extends AbstractEmbeddedFederationTestCase { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |