From: <tho...@us...> - 2010-09-15 10:22:41
|
Revision: 3555 http://bigdata.svn.sourceforge.net/bigdata/?rev=3555&view=rev Author: thompsonbry Date: 2010-09-15 10:22:34 +0000 (Wed, 15 Sep 2010) Log Message: ----------- Modified QueryEngine#eval() to accept the initial binding set chunk. Modified TestFederatedQueryEngine to create a local query controller to which the queries are submitted. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -1,108 +0,0 @@ -package com.bigdata.bop.engine; - -import java.io.Serializable; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.fed.FederatedRunningQuery; -import com.bigdata.relation.accesspath.IAsynchronousIterator; - -/** - * An non-{@link Serializable} chunk of intermediate results which are ready to - * be consumed by some {@link BOp} in a specific query (this is only used in - * query evaluation for the standalone database). - */ -public class BindingSetChunk<E> implements IChunkMessage<E> { - - /** The query controller. */ - private final IQueryClient queryController; - - /** - * The query identifier. - */ - private final long queryId; - - /** - * The target {@link BOp}. - */ - private final int bopId; - - /** - * The index partition which is being targeted for that {@link BOp}. - */ - private final int partitionId; - - /** - * The binding sets to be consumed by that {@link BOp}. - */ - private IAsynchronousIterator<E[]> source; - - public IQueryClient getQueryController() { - return queryController; - } - - public long getQueryId() { - return queryId; - } - - public int getBOpId() { - return bopId; - } - - public int getPartitionId() { - return partitionId; - } - - public boolean isMaterialized() { - return true; - } - - public BindingSetChunk(final IQueryClient queryController, - final long queryId, final int bopId, final int partitionId, - final IAsynchronousIterator<E[]> source) { - - if (queryController == null) - throw new IllegalArgumentException(); - - if (source == null) - throw new IllegalArgumentException(); - - this.queryController = queryController; - - this.queryId = queryId; - - this.bopId = bopId; - - this.partitionId = partitionId; - - this.source = source; - - } - - public String toString() { - - return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId - + ",partitionId=" + partitionId + "}"; - - } - - public void materialize(FederatedRunningQuery runningQuery) { - // NOP - } - - public void release() { - // NOP - } - - public IChunkAccessor<E> getChunkAccessor() { - return new ChunkAccessor(); - } - - private class ChunkAccessor implements IChunkAccessor<E> { - - public IAsynchronousIterator<E[]> iterator() { - return source; - } - - } - -} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -3,7 +3,6 @@ import java.rmi.RemoteException; import com.bigdata.bop.BindingSetPipelineOp; -import com.bigdata.bop.IBindingSet; /** * Interface for a client executing queries (the query controller). @@ -11,25 +10,6 @@ public interface IQueryClient extends IQueryPeer { /** - * Evaluate a query which visits {@link IBindingSet}s, such as a join. This - * node will serve as the controller for the query. - * - * @param queryId - * The unique identifier for the query. - * @param query - * The query to evaluate. - * - * @return An iterator visiting {@link IBindingSet}s which result from - * evaluating the query. - * - * @throws IllegalStateException - * if the {@link QueryEngine} has been {@link #shutdown()}. - * @throws Exception - * @throws RemoteException - */ - RunningQuery eval(long queryId, BindingSetPipelineOp query) throws Exception, RemoteException; - - /** * Return the query. * * @param queryId Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java (from rev 3554, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -0,0 +1,110 @@ +package com.bigdata.bop.engine; + +import java.io.Serializable; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.fed.FederatedRunningQuery; +import com.bigdata.relation.accesspath.IAsynchronousIterator; + +/** + * An non-{@link Serializable} chunk of intermediate results which are ready to + * be consumed by some {@link BOp} in a specific query (this is only used in + * query evaluation for the standalone database). + * + * @todo test suite + */ +public class LocalChunkMessage<E> implements IChunkMessage<E> { + + /** The query controller. */ + private final IQueryClient queryController; + + /** + * The query identifier. + */ + private final long queryId; + + /** + * The target {@link BOp}. + */ + private final int bopId; + + /** + * The index partition which is being targeted for that {@link BOp}. + */ + private final int partitionId; + + /** + * The binding sets to be consumed by that {@link BOp}. + */ + private IAsynchronousIterator<E[]> source; + + public IQueryClient getQueryController() { + return queryController; + } + + public long getQueryId() { + return queryId; + } + + public int getBOpId() { + return bopId; + } + + public int getPartitionId() { + return partitionId; + } + + public boolean isMaterialized() { + return true; + } + + public LocalChunkMessage(final IQueryClient queryController, + final long queryId, final int bopId, final int partitionId, + final IAsynchronousIterator<E[]> source) { + + if (queryController == null) + throw new IllegalArgumentException(); + + if (source == null) + throw new IllegalArgumentException(); + + this.queryController = queryController; + + this.queryId = queryId; + + this.bopId = bopId; + + this.partitionId = partitionId; + + this.source = source; + + } + + public String toString() { + + return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId + + ",partitionId=" + partitionId + "}"; + + } + + public void materialize(FederatedRunningQuery runningQuery) { + // NOP + } + + public void release() { + // NOP + } + + public IChunkAccessor<E> getChunkAccessor() { + return new ChunkAccessor(); + } + + private class ChunkAccessor implements IChunkAccessor<E> { + + public IAsynchronousIterator<E[]> iterator() { + return source; + } + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -45,6 +45,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; import com.bigdata.bop.bset.Union; +import com.bigdata.bop.fed.FederatedQueryEngine; import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; @@ -500,7 +501,7 @@ * evaluation. * <p> * Chunk concatenation could be performed here if we (a) mark the - * {@link BindingSetChunk} with a flag to indicate when it has been + * {@link LocalChunkMessage} with a flag to indicate when it has been * accepted; and (b) rip through the incoming chunks for the query for * the target bop and combine them to feed the task. Chunks which have * already been assigned would be dropped when take() discovers them. @@ -734,18 +735,57 @@ } + /** + * Evaluate a query which visits {@link IBindingSet}s, such as a join. This + * node will serve as the controller for the query. + * + * @param queryId + * The unique identifier for the query. + * @param query + * The query to evaluate. + * + * @return An iterator visiting {@link IBindingSet}s which result from + * evaluating the query. + * @param msg + * A message providing access to the initial {@link IBindingSet + * binding set(s)} used to begin query evaluation. + * + * @throws IllegalStateException + * if the {@link QueryEngine} has been {@link #shutdown()}. + * @throws Exception + * @throws RemoteException + * + * FIXME The test suites need to be modified to create a local + * {@link FederatedQueryEngine} object which fronts for an + * {@link IIndexManager} which is local to the client - not on a + * data service at all. This is necessary in order for the unit + * test (or application code) to directly access the + * RunningQuery reference, which is needed to use get() (to wait + * for the query), iterator() (to drain the query), etc. + * <p> + * This will also give us a place to hang query-local resources + * on the client. + * <p> + * This has to be a {@link FederatedQueryEngine} because it + * needs to talk to a federation. There should be nothing DS + * specific about the {@link FederatedQueryEngine}. + */ public RunningQuery eval(final long queryId, - final BindingSetPipelineOp query) throws Exception { + final BindingSetPipelineOp query, + final IChunkMessage<IBindingSet> msg) throws Exception { if (query == null) throw new IllegalArgumentException(); + if (msg == null) + throw new IllegalArgumentException(); + + if (queryId != msg.getQueryId()) // @todo use equals() to compare UUIDs. + throw new IllegalArgumentException(); + final RunningQuery runningQuery = newRunningQuery(this, queryId, -// System.currentTimeMillis()/* begin */, true/* controller */, this/* clientProxy */, query); - assertRunning(); - final long timeout = query.getProperty(BOp.Annotations.TIMEOUT, BOp.Annotations.DEFAULT_TIMEOUT); @@ -767,8 +807,12 @@ } + assertRunning(); + putRunningQuery(queryId, runningQuery); + runningQuery.startQuery(msg); + return runningQuery; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -366,7 +366,7 @@ /* * Note: The partitionId will always be -1 in scale-up. */ - final BindingSetChunk<IBindingSet> chunk = new BindingSetChunk<IBindingSet>( + final LocalChunkMessage<IBindingSet> chunk = new LocalChunkMessage<IBindingSet>( clientProxy, queryId, sinkId, -1/* partitionId */, sink .iterator()); @@ -646,11 +646,9 @@ /** * Invoked once by the query controller with the initial - * {@link BindingSetChunk} which gets the query moving. - * - * @todo this should reject multiple invocations for a given query instance. + * {@link IChunkMessage} which gets the query moving. */ - public void startQuery(final IChunkMessage<IBindingSet> msg) { + void startQuery(final IChunkMessage<IBindingSet> msg) { if (!controller) throw new UnsupportedOperationException(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -122,20 +122,6 @@ } /** - * Constructor used on a {@link DataService} (a query engine peer). - * - * @param dataService - * The data service. - */ - public FederatedQueryEngine(final DataService dataService) { - - this(dataService.getFederation(), - new DelegateIndexManager(dataService), dataService - .getResourceManager().getResourceService()); - - } - - /** * Overridden to strengthen the return type. * <p> * {@inheritDoc} @@ -152,6 +138,20 @@ return getClass().getName() + "{serviceUUID=" + getServiceUUID() + "}"; } + + /** + * Constructor used on a {@link DataService} (a query engine peer). + * + * @param dataService + * The data service. + */ + public FederatedQueryEngine(final DataService dataService) { + + this(dataService.getFederation(), + new DelegateIndexManager(dataService), dataService + .getResourceManager().getResourceService()); + + } /** * Constructor used on a non-{@link DataService} node to expose a query Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -42,7 +42,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IShardwisePipelineOp; -import com.bigdata.bop.engine.BindingSetChunk; +import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IQueryPeer; @@ -94,7 +94,7 @@ * A map associating resources with running queries. When a query halts, the * resources listed in its resource map are released. Resources can include * {@link ByteBuffer}s backing either incoming or outgoing - * {@link BindingSetChunk}s, temporary files associated with the query, hash + * {@link LocalChunkMessage}s, temporary files associated with the query, hash * tables, etc. * * @todo This map will eventually need to be moved into {@link RunningQuery} @@ -604,7 +604,7 @@ * query engine. */ - final IChunkMessage<IBindingSet> msg = new BindingSetChunk<IBindingSet>( + final IChunkMessage<IBindingSet> msg = new LocalChunkMessage<IBindingSet>( getQueryController(), getQueryId(), sinkId, partitionId, source.iterator()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -198,16 +198,11 @@ })); final long queryId = 1L; - final RunningQuery runningQuery = queryEngine.eval(queryId,query); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1/* partitionId */, + newBindingSetIterator(new HashBindingSet()))); - runningQuery.startQuery(new BindingSetChunk( - queryEngine, - queryId, - startId,// - -1, //partitionId - new ThickAsynchronousIterator<IBindingSet[]>( - new IBindingSet[][] { new IBindingSet[] { new HashBindingSet()} }))); - // Wait until the query is done. final Map<Integer, BOpStats> statsMap = runningQuery.get(); { @@ -290,13 +285,11 @@ ) }; final long queryId = 1L; - final RunningQuery runningQuery = queryEngine.eval(queryId, query); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1 /* partitionId */, + newBindingSetIterator(new HashBindingSet()))); - runningQuery.startQuery(new BindingSetChunk(queryEngine, queryId, - startId,// - -1, // partitionId - newBindingSetIterator(new HashBindingSet()))); - // verify solutions. assertSameSolutions(expected, runningQuery.iterator()); @@ -478,21 +471,22 @@ final BindingSetPipelineOp query = join2Op; - final long queryId = 1L; - final RunningQuery runningQuery = queryEngine.eval(queryId, query); - // start the query. + final long queryId = 1L; + final IChunkMessage<IBindingSet> initialChunkMessage; { - + final IBindingSet initialBindings = new HashBindingSet(); - + initialBindings.set(Var.var("x"), new Constant<String>("Mary")); - runningQuery.startQuery(new BindingSetChunk<IBindingSet>( - queryEngine, queryId, startId,// + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// -1, // partitionId - newBindingSetIterator(initialBindings))); + newBindingSetIterator(initialBindings)); } + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); // verify solutions. { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -27,10 +27,15 @@ package com.bigdata.bop.fed; +import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import junit.framework.TestCase2; @@ -51,9 +56,8 @@ import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.engine.BOpStats; -import com.bigdata.bop.engine.BindingSetChunk; -import com.bigdata.bop.engine.IQueryClient; -import com.bigdata.bop.engine.IQueryPeer; +import com.bigdata.bop.engine.IChunkMessage; +import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.PipelineDelayOp; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; @@ -62,17 +66,21 @@ import com.bigdata.bop.solutions.SliceOp; import com.bigdata.bop.solutions.SortOp; import com.bigdata.btree.keys.KeyBuilder; -import com.bigdata.io.SerializerUtil; +import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.service.EmbeddedFederation; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; +import com.bigdata.service.ManagedResourceService; +import com.bigdata.service.ResourceService; import com.bigdata.service.jini.JiniClient; import com.bigdata.service.jini.JiniFederation; import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.Dechunkerator; +import com.bigdata.util.config.NicUtil; import com.ibm.icu.impl.ByteBuffer; /** @@ -133,71 +141,130 @@ // The separator key between the index partitions. private byte[] separatorKey; - private IQueryClient queryEngine; - private JiniClient<?> client; - private IDataService dataService0; - private IDataService dataService1; + /** The local persistence store for the {@link #queryEngine}. */ + private Journal queryEngineStore; + + /** The local {@link ResourceService} for the {@link #queryEngine}. */ + private ManagedResourceService queryEngineResourceService; + + /** The query controller. */ + private FederatedQueryEngine queryEngine; + + private IDataService dataService0; + private IDataService dataService1; + protected void setUp() throws Exception { - client = new JiniClient(new String[]{"/nas/bigdata/bigdata-0.83.2/dist/bigdata/var/config/jini/bigdataStandalone.config"}); - - final IBigdataFederation<?> fed = client.connect(); - - final int maxCount = 2; - UUID[] dataServices = null; - while((dataServices = fed.getDataServiceUUIDs(maxCount)).length < maxCount) { - System.err.println("Waiting for "+maxCount+" data services. There are "+dataServices.length+" discovered."); - Thread.sleep(250/*ms*/); - } - - super.setUp(); + /* + * FIXME This is hardcoded to a specific location in the file system. + * + * Also, the dependency on JiniClient means that we must move this test + * class into the bigdata-jini package. + */ + client = new JiniClient( + new String[] { "/nas/bigdata/bigdata-0.83.2/dist/bigdata/var/config/jini/bigdataStandalone.config" }); - dataService0 = fed.getDataService(dataServices[0]); - dataService1 = fed.getDataService(dataServices[1]); + final IBigdataFederation<?> fed = client.connect(); + + // create index manager for the query controller. { + final Properties p = new Properties(); + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + queryEngineStore = new Journal(p); + } + + // create resource service for the query controller. + { + queryEngineResourceService = new ManagedResourceService( + new InetSocketAddress(InetAddress + .getByName(NicUtil.getIpAddress("default.nic", + "default", true/* loopbackOk */)), 0/* port */ + ), 0/* requestServicePoolSize */) { - // @todo need to wait for the dataService to be running. -// assertTrue(((DataService) dataServer.getProxy()) -// .getResourceManager().awaitRunning()); - - // resolve the query engine on one of the data services. - while ((queryEngine = (IQueryClient) dataService0.getQueryEngine()) == null) { - - if (log.isInfoEnabled()) - log.info("Waiting for query engine on dataService0"); - - Thread.sleep(250); - - } - - System.err.println("controller: " + queryEngine); - + @Override + protected File getResource(UUID uuid) throws Exception { + // Will not serve up files. + return null; + } + }; } + + // create the query controller. + queryEngine = new FederatedQueryEngine(fed, queryEngineStore, + queryEngineResourceService); + + /* + * Discover the data services. We need their UUIDs in order to create + * the test relation split across an index partition located on each of + * the two data services. + */ + final int maxCount = 2; + UUID[] dataServices = null; + final long begin = System.currentTimeMillis(); + long elapsed = 0L; + while ((dataServices = fed.getDataServiceUUIDs(maxCount)).length < maxCount + && ((elapsed = System.currentTimeMillis() - begin) < TimeUnit.SECONDS + .toMillis(60))) { + System.err.println("Waiting for " + maxCount + + " data services. There are " + dataServices.length + + " discovered : elapsed=" + elapsed + "ms"); + Thread.sleep(250/* ms */); + } - // resolve the query engine on the other data services. - { + if (dataServices.length < maxCount) + throw new TimeoutException("Discovered " + dataServices.length + + " data services in " + elapsed + "ms but require " + + maxCount); + + super.setUp(); - IQueryPeer other = null; - -// assertTrue(((DataService) dataServer.getProxy()) -// .getResourceManager().awaitRunning()); - - while ((other = dataService1.getQueryEngine()) == null) { - - if (log.isInfoEnabled()) - log.info("Waiting for query engine on dataService1"); - - Thread.sleep(250); - - } +// dataService0 = fed.getDataService(dataServices[0]); +// dataService1 = fed.getDataService(dataServices[1]); +// { +// +// // @todo need to wait for the dataService to be running. +//// assertTrue(((DataService) dataServer.getProxy()) +//// .getResourceManager().awaitRunning()); +// +// // resolve the query engine on one of the data services. +// while ((queryEngine = (IQueryClient) dataService0.getQueryEngine()) == null) { +// +// if (log.isInfoEnabled()) +// log.info("Waiting for query engine on dataService0"); +// +// Thread.sleep(250); +// +// } +// +// System.err.println("controller: " + queryEngine); +// +// } +// +// // resolve the query engine on the other data services. +// { +// +// IQueryPeer other = null; +// +//// assertTrue(((DataService) dataServer.getProxy()) +//// .getResourceManager().awaitRunning()); +// +// while ((other = dataService1.getQueryEngine()) == null) { +// +// if (log.isInfoEnabled()) +// log.info("Waiting for query engine on dataService1"); +// +// Thread.sleep(250); +// +// } +// +// System.err.println("other : " + other); +// +// } - System.err.println("other : " + other); - - } - loadData(); } @@ -213,7 +280,18 @@ dataService0 = null; dataService1 = null; - queryEngine = null; + if (queryEngineResourceService != null) { + queryEngineResourceService.shutdownNow(); + queryEngineResourceService = null; + } + if (queryEngineStore != null) { + queryEngineStore.destroy(); + queryEngineStore = null; + } + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } super.tearDown(); @@ -256,19 +334,21 @@ * Create the relation with the primary index key-range partitioned * using the given separator keys and data services. */ - - final R rel = new R(client.getFederation(), namespace, ITx.UNISOLATED, new Properties()); - if(client.getFederation() - .getResourceLocator().locate(namespace, ITx.UNISOLATED)==null) { - - rel.create(separatorKeys, dataServices); + final R rel = new R(client.getFederation(), namespace, ITx.UNISOLATED, + new Properties()); - /* - * Insert data into the appropriate index partitions. - */ - rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); - + if (client.getFederation().getResourceLocator().locate(namespace, + ITx.UNISOLATED) == null) { + + rel.create(separatorKeys, dataServices); + + /* + * Insert data into the appropriate index partitions. + */ + rel + .insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); + } } @@ -314,20 +394,14 @@ final BindingSetPipelineOp query = new StartOp(new BOp[] {}, NV .asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, startId),// -// new NV(Predicate.Annotations.READ_TIMESTAMP, ITx.READ_COMMITTED),// })); final long queryId = 1L; - final RunningQuery runningQuery = queryEngine.eval(queryId, query); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1 /* partitionId */, + newBindingSetIterator(new HashBindingSet()))); - runningQuery.startQuery(new BindingSetChunk( - queryEngine, - queryId, - startId,// - -1, //partitionId - new ThickAsynchronousIterator<IBindingSet[]>( - new IBindingSet[][] { new IBindingSet[] { new HashBindingSet()} }))); - // Wait until the query is done. final Map<Integer, BOpStats> statsMap = runningQuery.get(); { @@ -425,13 +499,12 @@ ) }; final long queryId = 1L; - final RunningQuery runningQuery = queryEngine.eval(queryId, query); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId,// + -1, /* partitionId */ + newBindingSetIterator(new HashBindingSet()))); - runningQuery.startQuery(new BindingSetChunk(queryEngine, queryId, - startId,// - -1, // partitionId - newBindingSetIterator(new HashBindingSet()))); - // verify solutions. TestQueryEngine.assertSameSolutionsAnyOrder(expected, new Dechunkerator<IBindingSet>(runningQuery.iterator())); @@ -618,22 +691,23 @@ new NV(Predicate.Annotations.BOP_ID, sliceId),// })); + // start the query. final long queryId = 1L; - final RunningQuery runningQuery = queryEngine.eval(queryId, query); + final IChunkMessage<IBindingSet> initialChunkMessage; + { - // start the query. - { - final IBindingSet initialBindings = new HashBindingSet(); - + initialBindings.set(Var.var("x"), new Constant<String>("Mary")); - runningQuery.startQuery(new BindingSetChunk(queryEngine, queryId, - startId,// + initialChunkMessage = new LocalChunkMessage<IBindingSet>( + queryEngine, queryId, startId,// -1, // partitionId - newBindingSetIterator(initialBindings))); + newBindingSetIterator(initialBindings)); } + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); // verify solutions. { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -257,12 +257,6 @@ return null; } - @Override - public RunningQuery eval(long queryId, BindingSetPipelineOp query) - throws Exception, RemoteException { - return null; - } - } private static class MyNIOChunkMessage<E> extends NIOChunkMessage<E> { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-14 20:48:21 UTC (rev 3554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-15 10:22:34 UTC (rev 3555) @@ -180,12 +180,6 @@ throws RemoteException { return null; } - - @Override - public RunningQuery eval(long queryId, BindingSetPipelineOp query) - throws Exception, RemoteException { - return null; - } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |