|
From: <tho...@us...> - 2010-09-15 10:22:41
|
Revision: 3555
http://bigdata.svn.sourceforge.net/bigdata/?rev=3555&view=rev
Author: thompsonbry
Date: 2010-09-15 10:22:34 +0000 (Wed, 15 Sep 2010)
Log Message:
-----------
Modified QueryEngine#eval() to accept the initial binding set chunk.
Modified TestFederatedQueryEngine to create a local query controller to which the queries are submitted.
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java
Added Paths:
-----------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java
Removed Paths:
-------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -1,108 +0,0 @@
-package com.bigdata.bop.engine;
-
-import java.io.Serializable;
-
-import com.bigdata.bop.BOp;
-import com.bigdata.bop.fed.FederatedRunningQuery;
-import com.bigdata.relation.accesspath.IAsynchronousIterator;
-
-/**
- * An non-{@link Serializable} chunk of intermediate results which are ready to
- * be consumed by some {@link BOp} in a specific query (this is only used in
- * query evaluation for the standalone database).
- */
-public class BindingSetChunk<E> implements IChunkMessage<E> {
-
- /** The query controller. */
- private final IQueryClient queryController;
-
- /**
- * The query identifier.
- */
- private final long queryId;
-
- /**
- * The target {@link BOp}.
- */
- private final int bopId;
-
- /**
- * The index partition which is being targeted for that {@link BOp}.
- */
- private final int partitionId;
-
- /**
- * The binding sets to be consumed by that {@link BOp}.
- */
- private IAsynchronousIterator<E[]> source;
-
- public IQueryClient getQueryController() {
- return queryController;
- }
-
- public long getQueryId() {
- return queryId;
- }
-
- public int getBOpId() {
- return bopId;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public boolean isMaterialized() {
- return true;
- }
-
- public BindingSetChunk(final IQueryClient queryController,
- final long queryId, final int bopId, final int partitionId,
- final IAsynchronousIterator<E[]> source) {
-
- if (queryController == null)
- throw new IllegalArgumentException();
-
- if (source == null)
- throw new IllegalArgumentException();
-
- this.queryController = queryController;
-
- this.queryId = queryId;
-
- this.bopId = bopId;
-
- this.partitionId = partitionId;
-
- this.source = source;
-
- }
-
- public String toString() {
-
- return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId
- + ",partitionId=" + partitionId + "}";
-
- }
-
- public void materialize(FederatedRunningQuery runningQuery) {
- // NOP
- }
-
- public void release() {
- // NOP
- }
-
- public IChunkAccessor<E> getChunkAccessor() {
- return new ChunkAccessor();
- }
-
- private class ChunkAccessor implements IChunkAccessor<E> {
-
- public IAsynchronousIterator<E[]> iterator() {
- return source;
- }
-
- }
-
-}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -3,7 +3,6 @@
import java.rmi.RemoteException;
import com.bigdata.bop.BindingSetPipelineOp;
-import com.bigdata.bop.IBindingSet;
/**
* Interface for a client executing queries (the query controller).
@@ -11,25 +10,6 @@
public interface IQueryClient extends IQueryPeer {
/**
- * Evaluate a query which visits {@link IBindingSet}s, such as a join. This
- * node will serve as the controller for the query.
- *
- * @param queryId
- * The unique identifier for the query.
- * @param query
- * The query to evaluate.
- *
- * @return An iterator visiting {@link IBindingSet}s which result from
- * evaluating the query.
- *
- * @throws IllegalStateException
- * if the {@link QueryEngine} has been {@link #shutdown()}.
- * @throws Exception
- * @throws RemoteException
- */
- RunningQuery eval(long queryId, BindingSetPipelineOp query) throws Exception, RemoteException;
-
- /**
* Return the query.
*
* @param queryId
Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java (from rev 3554, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java)
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -0,0 +1,110 @@
+package com.bigdata.bop.engine;
+
+import java.io.Serializable;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.fed.FederatedRunningQuery;
+import com.bigdata.relation.accesspath.IAsynchronousIterator;
+
+/**
+ * An non-{@link Serializable} chunk of intermediate results which are ready to
+ * be consumed by some {@link BOp} in a specific query (this is only used in
+ * query evaluation for the standalone database).
+ *
+ * @todo test suite
+ */
+public class LocalChunkMessage<E> implements IChunkMessage<E> {
+
+ /** The query controller. */
+ private final IQueryClient queryController;
+
+ /**
+ * The query identifier.
+ */
+ private final long queryId;
+
+ /**
+ * The target {@link BOp}.
+ */
+ private final int bopId;
+
+ /**
+ * The index partition which is being targeted for that {@link BOp}.
+ */
+ private final int partitionId;
+
+ /**
+ * The binding sets to be consumed by that {@link BOp}.
+ */
+ private IAsynchronousIterator<E[]> source;
+
+ public IQueryClient getQueryController() {
+ return queryController;
+ }
+
+ public long getQueryId() {
+ return queryId;
+ }
+
+ public int getBOpId() {
+ return bopId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public boolean isMaterialized() {
+ return true;
+ }
+
+ public LocalChunkMessage(final IQueryClient queryController,
+ final long queryId, final int bopId, final int partitionId,
+ final IAsynchronousIterator<E[]> source) {
+
+ if (queryController == null)
+ throw new IllegalArgumentException();
+
+ if (source == null)
+ throw new IllegalArgumentException();
+
+ this.queryController = queryController;
+
+ this.queryId = queryId;
+
+ this.bopId = bopId;
+
+ this.partitionId = partitionId;
+
+ this.source = source;
+
+ }
+
+ public String toString() {
+
+ return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId
+ + ",partitionId=" + partitionId + "}";
+
+ }
+
+ public void materialize(FederatedRunningQuery runningQuery) {
+ // NOP
+ }
+
+ public void release() {
+ // NOP
+ }
+
+ public IChunkAccessor<E> getChunkAccessor() {
+ return new ChunkAccessor();
+ }
+
+ private class ChunkAccessor implements IChunkAccessor<E> {
+
+ public IAsynchronousIterator<E[]> iterator() {
+ return source;
+ }
+
+ }
+
+}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -45,6 +45,7 @@
import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IPredicate;
import com.bigdata.bop.bset.Union;
+import com.bigdata.bop.fed.FederatedQueryEngine;
import com.bigdata.btree.BTree;
import com.bigdata.btree.IndexSegment;
import com.bigdata.btree.view.FusedView;
@@ -500,7 +501,7 @@
* evaluation.
* <p>
* Chunk concatenation could be performed here if we (a) mark the
- * {@link BindingSetChunk} with a flag to indicate when it has been
+ * {@link LocalChunkMessage} with a flag to indicate when it has been
* accepted; and (b) rip through the incoming chunks for the query for
* the target bop and combine them to feed the task. Chunks which have
* already been assigned would be dropped when take() discovers them.
@@ -734,18 +735,57 @@
}
+ /**
+ * Evaluate a query which visits {@link IBindingSet}s, such as a join. This
+ * node will serve as the controller for the query.
+ *
+ * @param queryId
+ * The unique identifier for the query.
+ * @param query
+ * The query to evaluate.
+ *
+ * @return An iterator visiting {@link IBindingSet}s which result from
+ * evaluating the query.
+ * @param msg
+ * A message providing access to the initial {@link IBindingSet
+ * binding set(s)} used to begin query evaluation.
+ *
+ * @throws IllegalStateException
+ * if the {@link QueryEngine} has been {@link #shutdown()}.
+ * @throws Exception
+ * @throws RemoteException
+ *
+ * FIXME The test suites need to be modified to create a local
+ * {@link FederatedQueryEngine} object which fronts for an
+ * {@link IIndexManager} which is local to the client - not on a
+ * data service at all. This is necessary in order for the unit
+ * test (or application code) to directly access the
+ * RunningQuery reference, which is needed to use get() (to wait
+ * for the query), iterator() (to drain the query), etc.
+ * <p>
+ * This will also give us a place to hang query-local resources
+ * on the client.
+ * <p>
+ * This has to be a {@link FederatedQueryEngine} because it
+ * needs to talk to a federation. There should be nothing DS
+ * specific about the {@link FederatedQueryEngine}.
+ */
public RunningQuery eval(final long queryId,
- final BindingSetPipelineOp query) throws Exception {
+ final BindingSetPipelineOp query,
+ final IChunkMessage<IBindingSet> msg) throws Exception {
if (query == null)
throw new IllegalArgumentException();
+ if (msg == null)
+ throw new IllegalArgumentException();
+
+ if (queryId != msg.getQueryId()) // @todo use equals() to compare UUIDs.
+ throw new IllegalArgumentException();
+
final RunningQuery runningQuery = newRunningQuery(this, queryId,
-// System.currentTimeMillis()/* begin */,
true/* controller */, this/* clientProxy */, query);
- assertRunning();
-
final long timeout = query.getProperty(BOp.Annotations.TIMEOUT,
BOp.Annotations.DEFAULT_TIMEOUT);
@@ -767,8 +807,12 @@
}
+ assertRunning();
+
putRunningQuery(queryId, runningQuery);
+ runningQuery.startQuery(msg);
+
return runningQuery;
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -366,7 +366,7 @@
/*
* Note: The partitionId will always be -1 in scale-up.
*/
- final BindingSetChunk<IBindingSet> chunk = new BindingSetChunk<IBindingSet>(
+ final LocalChunkMessage<IBindingSet> chunk = new LocalChunkMessage<IBindingSet>(
clientProxy, queryId, sinkId, -1/* partitionId */, sink
.iterator());
@@ -646,11 +646,9 @@
/**
* Invoked once by the query controller with the initial
- * {@link BindingSetChunk} which gets the query moving.
- *
- * @todo this should reject multiple invocations for a given query instance.
+ * {@link IChunkMessage} which gets the query moving.
*/
- public void startQuery(final IChunkMessage<IBindingSet> msg) {
+ void startQuery(final IChunkMessage<IBindingSet> msg) {
if (!controller)
throw new UnsupportedOperationException();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -122,20 +122,6 @@
}
/**
- * Constructor used on a {@link DataService} (a query engine peer).
- *
- * @param dataService
- * The data service.
- */
- public FederatedQueryEngine(final DataService dataService) {
-
- this(dataService.getFederation(),
- new DelegateIndexManager(dataService), dataService
- .getResourceManager().getResourceService());
-
- }
-
- /**
* Overridden to strengthen the return type.
* <p>
* {@inheritDoc}
@@ -152,6 +138,20 @@
return getClass().getName() + "{serviceUUID=" + getServiceUUID() + "}";
}
+
+ /**
+ * Constructor used on a {@link DataService} (a query engine peer).
+ *
+ * @param dataService
+ * The data service.
+ */
+ public FederatedQueryEngine(final DataService dataService) {
+
+ this(dataService.getFederation(),
+ new DelegateIndexManager(dataService), dataService
+ .getResourceManager().getResourceService());
+
+ }
/**
* Constructor used on a non-{@link DataService} node to expose a query
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -42,7 +42,7 @@
import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IPredicate;
import com.bigdata.bop.IShardwisePipelineOp;
-import com.bigdata.bop.engine.BindingSetChunk;
+import com.bigdata.bop.engine.LocalChunkMessage;
import com.bigdata.bop.engine.IChunkMessage;
import com.bigdata.bop.engine.IQueryClient;
import com.bigdata.bop.engine.IQueryPeer;
@@ -94,7 +94,7 @@
* A map associating resources with running queries. When a query halts, the
* resources listed in its resource map are released. Resources can include
* {@link ByteBuffer}s backing either incoming or outgoing
- * {@link BindingSetChunk}s, temporary files associated with the query, hash
+ * {@link LocalChunkMessage}s, temporary files associated with the query, hash
* tables, etc.
*
* @todo This map will eventually need to be moved into {@link RunningQuery}
@@ -604,7 +604,7 @@
* query engine.
*/
- final IChunkMessage<IBindingSet> msg = new BindingSetChunk<IBindingSet>(
+ final IChunkMessage<IBindingSet> msg = new LocalChunkMessage<IBindingSet>(
getQueryController(), getQueryId(), sinkId, partitionId,
source.iterator());
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -198,16 +198,11 @@
}));
final long queryId = 1L;
- final RunningQuery runningQuery = queryEngine.eval(queryId,query);
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ new LocalChunkMessage<IBindingSet>(queryEngine, queryId,
+ startId, -1/* partitionId */,
+ newBindingSetIterator(new HashBindingSet())));
- runningQuery.startQuery(new BindingSetChunk(
- queryEngine,
- queryId,
- startId,//
- -1, //partitionId
- new ThickAsynchronousIterator<IBindingSet[]>(
- new IBindingSet[][] { new IBindingSet[] { new HashBindingSet()} })));
-
// Wait until the query is done.
final Map<Integer, BOpStats> statsMap = runningQuery.get();
{
@@ -290,13 +285,11 @@
) };
final long queryId = 1L;
- final RunningQuery runningQuery = queryEngine.eval(queryId, query);
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ new LocalChunkMessage<IBindingSet>(queryEngine, queryId,
+ startId, -1 /* partitionId */,
+ newBindingSetIterator(new HashBindingSet())));
- runningQuery.startQuery(new BindingSetChunk(queryEngine, queryId,
- startId,//
- -1, // partitionId
- newBindingSetIterator(new HashBindingSet())));
-
// verify solutions.
assertSameSolutions(expected, runningQuery.iterator());
@@ -478,21 +471,22 @@
final BindingSetPipelineOp query = join2Op;
- final long queryId = 1L;
- final RunningQuery runningQuery = queryEngine.eval(queryId, query);
-
// start the query.
+ final long queryId = 1L;
+ final IChunkMessage<IBindingSet> initialChunkMessage;
{
-
+
final IBindingSet initialBindings = new HashBindingSet();
-
+
initialBindings.set(Var.var("x"), new Constant<String>("Mary"));
- runningQuery.startQuery(new BindingSetChunk<IBindingSet>(
- queryEngine, queryId, startId,//
+ initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine,
+ queryId, startId,//
-1, // partitionId
- newBindingSetIterator(initialBindings)));
+ newBindingSetIterator(initialBindings));
}
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ initialChunkMessage);
// verify solutions.
{
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -27,10 +27,15 @@
package com.bigdata.bop.fed;
+import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import junit.framework.TestCase2;
@@ -51,9 +56,8 @@
import com.bigdata.bop.ap.R;
import com.bigdata.bop.bset.StartOp;
import com.bigdata.bop.engine.BOpStats;
-import com.bigdata.bop.engine.BindingSetChunk;
-import com.bigdata.bop.engine.IQueryClient;
-import com.bigdata.bop.engine.IQueryPeer;
+import com.bigdata.bop.engine.IChunkMessage;
+import com.bigdata.bop.engine.LocalChunkMessage;
import com.bigdata.bop.engine.PipelineDelayOp;
import com.bigdata.bop.engine.QueryEngine;
import com.bigdata.bop.engine.RunningQuery;
@@ -62,17 +66,21 @@
import com.bigdata.bop.solutions.SliceOp;
import com.bigdata.bop.solutions.SortOp;
import com.bigdata.btree.keys.KeyBuilder;
-import com.bigdata.io.SerializerUtil;
+import com.bigdata.journal.BufferMode;
import com.bigdata.journal.ITx;
+import com.bigdata.journal.Journal;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.relation.accesspath.ThickAsynchronousIterator;
import com.bigdata.service.EmbeddedFederation;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.IDataService;
+import com.bigdata.service.ManagedResourceService;
+import com.bigdata.service.ResourceService;
import com.bigdata.service.jini.JiniClient;
import com.bigdata.service.jini.JiniFederation;
import com.bigdata.striterator.ChunkedArrayIterator;
import com.bigdata.striterator.Dechunkerator;
+import com.bigdata.util.config.NicUtil;
import com.ibm.icu.impl.ByteBuffer;
/**
@@ -133,71 +141,130 @@
// The separator key between the index partitions.
private byte[] separatorKey;
- private IQueryClient queryEngine;
-
private JiniClient<?> client;
- private IDataService dataService0;
- private IDataService dataService1;
+ /** The local persistence store for the {@link #queryEngine}. */
+ private Journal queryEngineStore;
+
+ /** The local {@link ResourceService} for the {@link #queryEngine}. */
+ private ManagedResourceService queryEngineResourceService;
+
+ /** The query controller. */
+ private FederatedQueryEngine queryEngine;
+
+ private IDataService dataService0;
+ private IDataService dataService1;
+
protected void setUp() throws Exception {
- client = new JiniClient(new String[]{"/nas/bigdata/bigdata-0.83.2/dist/bigdata/var/config/jini/bigdataStandalone.config"});
-
- final IBigdataFederation<?> fed = client.connect();
-
- final int maxCount = 2;
- UUID[] dataServices = null;
- while((dataServices = fed.getDataServiceUUIDs(maxCount)).length < maxCount) {
- System.err.println("Waiting for "+maxCount+" data services. There are "+dataServices.length+" discovered.");
- Thread.sleep(250/*ms*/);
- }
-
- super.setUp();
+ /*
+ * FIXME This is hardcoded to a specific location in the file system.
+ *
+ * Also, the dependency on JiniClient means that we must move this test
+ * class into the bigdata-jini package.
+ */
+ client = new JiniClient(
+ new String[] { "/nas/bigdata/bigdata-0.83.2/dist/bigdata/var/config/jini/bigdataStandalone.config" });
- dataService0 = fed.getDataService(dataServices[0]);
- dataService1 = fed.getDataService(dataServices[1]);
+ final IBigdataFederation<?> fed = client.connect();
+
+ // create index manager for the query controller.
{
+ final Properties p = new Properties();
+ p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient
+ .toString());
+ queryEngineStore = new Journal(p);
+ }
+
+ // create resource service for the query controller.
+ {
+ queryEngineResourceService = new ManagedResourceService(
+ new InetSocketAddress(InetAddress
+ .getByName(NicUtil.getIpAddress("default.nic",
+ "default", true/* loopbackOk */)), 0/* port */
+ ), 0/* requestServicePoolSize */) {
- // @todo need to wait for the dataService to be running.
-// assertTrue(((DataService) dataServer.getProxy())
-// .getResourceManager().awaitRunning());
-
- // resolve the query engine on one of the data services.
- while ((queryEngine = (IQueryClient) dataService0.getQueryEngine()) == null) {
-
- if (log.isInfoEnabled())
- log.info("Waiting for query engine on dataService0");
-
- Thread.sleep(250);
-
- }
-
- System.err.println("controller: " + queryEngine);
-
+ @Override
+ protected File getResource(UUID uuid) throws Exception {
+ // Will not serve up files.
+ return null;
+ }
+ };
}
+
+ // create the query controller.
+ queryEngine = new FederatedQueryEngine(fed, queryEngineStore,
+ queryEngineResourceService);
+
+ /*
+ * Discover the data services. We need their UUIDs in order to create
+ * the test relation split across an index partition located on each of
+ * the two data services.
+ */
+ final int maxCount = 2;
+ UUID[] dataServices = null;
+ final long begin = System.currentTimeMillis();
+ long elapsed = 0L;
+ while ((dataServices = fed.getDataServiceUUIDs(maxCount)).length < maxCount
+ && ((elapsed = System.currentTimeMillis() - begin) < TimeUnit.SECONDS
+ .toMillis(60))) {
+ System.err.println("Waiting for " + maxCount
+ + " data services. There are " + dataServices.length
+ + " discovered : elapsed=" + elapsed + "ms");
+ Thread.sleep(250/* ms */);
+ }
- // resolve the query engine on the other data services.
- {
+ if (dataServices.length < maxCount)
+ throw new TimeoutException("Discovered " + dataServices.length
+ + " data services in " + elapsed + "ms but require "
+ + maxCount);
+
+ super.setUp();
- IQueryPeer other = null;
-
-// assertTrue(((DataService) dataServer.getProxy())
-// .getResourceManager().awaitRunning());
-
- while ((other = dataService1.getQueryEngine()) == null) {
-
- if (log.isInfoEnabled())
- log.info("Waiting for query engine on dataService1");
-
- Thread.sleep(250);
-
- }
+// dataService0 = fed.getDataService(dataServices[0]);
+// dataService1 = fed.getDataService(dataServices[1]);
+// {
+//
+// // @todo need to wait for the dataService to be running.
+//// assertTrue(((DataService) dataServer.getProxy())
+//// .getResourceManager().awaitRunning());
+//
+// // resolve the query engine on one of the data services.
+// while ((queryEngine = (IQueryClient) dataService0.getQueryEngine()) == null) {
+//
+// if (log.isInfoEnabled())
+// log.info("Waiting for query engine on dataService0");
+//
+// Thread.sleep(250);
+//
+// }
+//
+// System.err.println("controller: " + queryEngine);
+//
+// }
+//
+// // resolve the query engine on the other data services.
+// {
+//
+// IQueryPeer other = null;
+//
+//// assertTrue(((DataService) dataServer.getProxy())
+//// .getResourceManager().awaitRunning());
+//
+// while ((other = dataService1.getQueryEngine()) == null) {
+//
+// if (log.isInfoEnabled())
+// log.info("Waiting for query engine on dataService1");
+//
+// Thread.sleep(250);
+//
+// }
+//
+// System.err.println("other : " + other);
+//
+// }
- System.err.println("other : " + other);
-
- }
-
loadData();
}
@@ -213,7 +280,18 @@
dataService0 = null;
dataService1 = null;
- queryEngine = null;
+ if (queryEngineResourceService != null) {
+ queryEngineResourceService.shutdownNow();
+ queryEngineResourceService = null;
+ }
+ if (queryEngineStore != null) {
+ queryEngineStore.destroy();
+ queryEngineStore = null;
+ }
+ if (queryEngine != null) {
+ queryEngine.shutdownNow();
+ queryEngine = null;
+ }
super.tearDown();
@@ -256,19 +334,21 @@
* Create the relation with the primary index key-range partitioned
* using the given separator keys and data services.
*/
-
- final R rel = new R(client.getFederation(), namespace, ITx.UNISOLATED, new Properties());
- if(client.getFederation()
- .getResourceLocator().locate(namespace, ITx.UNISOLATED)==null) {
-
- rel.create(separatorKeys, dataServices);
+ final R rel = new R(client.getFederation(), namespace, ITx.UNISOLATED,
+ new Properties());
- /*
- * Insert data into the appropriate index partitions.
- */
- rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */));
-
+ if (client.getFederation().getResourceLocator().locate(namespace,
+ ITx.UNISOLATED) == null) {
+
+ rel.create(separatorKeys, dataServices);
+
+ /*
+ * Insert data into the appropriate index partitions.
+ */
+ rel
+ .insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */));
+
}
}
@@ -314,20 +394,14 @@
final BindingSetPipelineOp query = new StartOp(new BOp[] {}, NV
.asMap(new NV[] {//
new NV(Predicate.Annotations.BOP_ID, startId),//
-// new NV(Predicate.Annotations.READ_TIMESTAMP, ITx.READ_COMMITTED),//
}));
final long queryId = 1L;
- final RunningQuery runningQuery = queryEngine.eval(queryId, query);
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ new LocalChunkMessage<IBindingSet>(queryEngine, queryId,
+ startId, -1 /* partitionId */,
+ newBindingSetIterator(new HashBindingSet())));
- runningQuery.startQuery(new BindingSetChunk(
- queryEngine,
- queryId,
- startId,//
- -1, //partitionId
- new ThickAsynchronousIterator<IBindingSet[]>(
- new IBindingSet[][] { new IBindingSet[] { new HashBindingSet()} })));
-
// Wait until the query is done.
final Map<Integer, BOpStats> statsMap = runningQuery.get();
{
@@ -425,13 +499,12 @@
) };
final long queryId = 1L;
- final RunningQuery runningQuery = queryEngine.eval(queryId, query);
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ new LocalChunkMessage<IBindingSet>(queryEngine, queryId,
+ startId,//
+ -1, /* partitionId */
+ newBindingSetIterator(new HashBindingSet())));
- runningQuery.startQuery(new BindingSetChunk(queryEngine, queryId,
- startId,//
- -1, // partitionId
- newBindingSetIterator(new HashBindingSet())));
-
// verify solutions.
TestQueryEngine.assertSameSolutionsAnyOrder(expected,
new Dechunkerator<IBindingSet>(runningQuery.iterator()));
@@ -618,22 +691,23 @@
new NV(Predicate.Annotations.BOP_ID, sliceId),//
}));
+ // start the query.
final long queryId = 1L;
- final RunningQuery runningQuery = queryEngine.eval(queryId, query);
+ final IChunkMessage<IBindingSet> initialChunkMessage;
+ {
- // start the query.
- {
-
final IBindingSet initialBindings = new HashBindingSet();
-
+
initialBindings.set(Var.var("x"), new Constant<String>("Mary"));
- runningQuery.startQuery(new BindingSetChunk(queryEngine, queryId,
- startId,//
+ initialChunkMessage = new LocalChunkMessage<IBindingSet>(
+ queryEngine, queryId, startId,//
-1, // partitionId
- newBindingSetIterator(initialBindings)));
+ newBindingSetIterator(initialBindings));
}
+ final RunningQuery runningQuery = queryEngine.eval(queryId, query,
+ initialChunkMessage);
// verify solutions.
{
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -257,12 +257,6 @@
return null;
}
- @Override
- public RunningQuery eval(long queryId, BindingSetPipelineOp query)
- throws Exception, RemoteException {
- return null;
- }
-
}
private static class MyNIOChunkMessage<E> extends NIOChunkMessage<E> {
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-14 20:48:21 UTC (rev 3554)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-15 10:22:34 UTC (rev 3555)
@@ -180,12 +180,6 @@
throws RemoteException {
return null;
}
-
- @Override
- public RunningQuery eval(long queryId, BindingSetPipelineOp query)
- throws Exception, RemoteException {
- return null;
- }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|