From: <tho...@us...> - 2010-09-14 20:48:27
|
Revision: 3554 http://bigdata.svn.sourceforge.net/bigdata/?rev=3554&view=rev Author: thompsonbry Date: 2010-09-14 20:48:21 +0000 (Tue, 14 Sep 2010) Log Message: ----------- Working through RMI invocations on 2-DS cluster. IQueryClient#eval() was added, but it needs to be modified to pass along the initial IChunkMessage which gets query evaluation started and the unit tests need to be updated to reflect that change. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -0,0 +1,31 @@ +package com.bigdata.bop.bset; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; + +/** + * A version of {@link CopyBindingSetOp} which is always evaluated on the query + * controller. + */ +public class StartOp extends CopyBindingSetOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public StartOp(StartOp op) { + super(op); + } + + public StartOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + final public BOpEvaluationContext getEvaluationContext() { + return BOpEvaluationContext.CONTROLLER; + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -3,6 +3,7 @@ import java.rmi.RemoteException; import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.IBindingSet; /** * Interface for a client executing queries (the query controller). @@ -10,6 +11,25 @@ public interface IQueryClient extends IQueryPeer { /** + * Evaluate a query which visits {@link IBindingSet}s, such as a join. This + * node will serve as the controller for the query. + * + * @param queryId + * The unique identifier for the query. + * @param query + * The query to evaluate. + * + * @return An iterator visiting {@link IBindingSet}s which result from + * evaluating the query. + * + * @throws IllegalStateException + * if the {@link QueryEngine} has been {@link #shutdown()}. + * @throws Exception + * @throws RemoteException + */ + RunningQuery eval(long queryId, BindingSetPipelineOp query) throws Exception, RemoteException; + + /** * Return the query. * * @param queryId @@ -19,13 +39,13 @@ * @throws IllegalArgumentException * if there is no such query. */ - public BindingSetPipelineOp getQuery(long queryId) throws RemoteException; + BindingSetPipelineOp getQuery(long queryId) throws RemoteException; /** * Notify the client that execution has started for some query, operator, * node, and index partition. */ - public void startOp(StartOpMessage msg) + void startOp(StartOpMessage msg) throws RemoteException; /** @@ -33,6 +53,6 @@ * node, shard, and source binding set chunk(s). If execution halted * abnormally, then the cause is sent as well. */ - public void haltOp(HaltOpMessage msg) throws RemoteException; + void haltOp(HaltOpMessage msg) throws RemoteException; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -33,7 +33,7 @@ * @throws UnsupportedOperationException * unless running in scale-out. */ - void declareQuery(IQueryDecl queryDecl); + void declareQuery(IQueryDecl queryDecl) throws RemoteException; /** * Notify a service that a buffer having data for some {@link BOp} in some Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -27,7 +27,6 @@ package com.bigdata.bop.engine; -import com.bigdata.bop.BOp; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.journal.IIndexManager; import com.bigdata.service.IBigdataFederation; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -734,22 +734,6 @@ } - /** - * Evaluate a query which visits {@link IBindingSet}s, such as a join. This - * node will serve as the controller for the query. - * - * @param queryId - * The unique identifier for the query. - * @param query - * The query to evaluate. - * - * @return An iterator visiting {@link IBindingSet}s which result from - * evaluating the query. - * - * @throws IllegalStateException - * if the {@link QueryEngine} has been {@link #shutdown()}. - * @throws Exception - */ public RunningQuery eval(final long queryId, final BindingSetPipelineOp query) throws Exception { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -37,7 +37,6 @@ import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; -import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.Constant; import com.bigdata.bop.HashBindingSet; @@ -50,9 +49,10 @@ import com.bigdata.bop.ap.E; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; -import com.bigdata.bop.bset.CopyBindingSetOp; +import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.BindingSetChunk; +import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IQueryPeer; import com.bigdata.bop.engine.PipelineDelayOp; import com.bigdata.bop.engine.QueryEngine; @@ -62,15 +62,15 @@ import com.bigdata.bop.solutions.SliceOp; import com.bigdata.bop.solutions.SortOp; import com.bigdata.btree.keys.KeyBuilder; -import com.bigdata.jini.util.JiniUtil; +import com.bigdata.io.SerializerUtil; import com.bigdata.journal.ITx; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; -import com.bigdata.service.DataService; import com.bigdata.service.EmbeddedFederation; -import com.bigdata.service.jini.DataServer; +import com.bigdata.service.IBigdataFederation; +import com.bigdata.service.IDataService; +import com.bigdata.service.jini.JiniClient; import com.bigdata.service.jini.JiniFederation; -import com.bigdata.service.jini.util.JiniServicesHelper; import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.Dechunkerator; import com.ibm.icu.impl.ByteBuffer; @@ -128,40 +128,43 @@ } // Namespace for the relation. - static private final String namespace = "ns"; + static private final String namespace = TestFederatedQueryEngine.class.getName(); // The separator key between the index partitions. private byte[] separatorKey; - private FederatedQueryEngine queryEngine; + private IQueryClient queryEngine; + private JiniClient<?> client; - private JiniServicesHelper helper; - -// private JiniClient<?> client; - + private IDataService dataService0; + private IDataService dataService1; + protected void setUp() throws Exception { + + client = new JiniClient(new String[]{"/nas/bigdata/bigdata-0.83.2/dist/bigdata/var/config/jini/bigdataStandalone.config"}); + + final IBigdataFederation<?> fed = client.connect(); - helper = new JiniServicesHelper(); - - // start services. - helper.start(); - -// // expose to subclasses. -// client = helper.client; - + final int maxCount = 2; + UUID[] dataServices = null; + while((dataServices = fed.getDataServiceUUIDs(maxCount)).length < maxCount) { + System.err.println("Waiting for "+maxCount+" data services. There are "+dataServices.length+" discovered."); + Thread.sleep(250/*ms*/); + } + super.setUp(); - + + dataService0 = fed.getDataService(dataServices[0]); + dataService1 = fed.getDataService(dataServices[1]); { - - final DataServer dataServer = helper.dataServer0; - - assertTrue(((DataService) dataServer.getProxy()) - .getResourceManager().awaitRunning()); + // @todo need to wait for the dataService to be running. +// assertTrue(((DataService) dataServer.getProxy()) +// .getResourceManager().awaitRunning()); + // resolve the query engine on one of the data services. - while ((queryEngine = (FederatedQueryEngine) ((DataService) dataServer - .getProxy()).getQueryEngine()) == null) { + while ((queryEngine = (IQueryClient) dataService0.getQueryEngine()) == null) { if (log.isInfoEnabled()) log.info("Waiting for query engine on dataService0"); @@ -175,17 +178,14 @@ } // resolve the query engine on the other data services. - if (helper.dataServer1 != null) { + { - final DataServer dataServer = helper.dataServer1; - IQueryPeer other = null; - assertTrue(((DataService) dataServer.getProxy()) - .getResourceManager().awaitRunning()); +// assertTrue(((DataService) dataServer.getProxy()) +// .getResourceManager().awaitRunning()); - while ((other = ((DataService) dataServer.getProxy()) - .getQueryEngine()) == null) { + while ((other = dataService1.getQueryEngine()) == null) { if (log.isInfoEnabled()) log.info("Waiting for query engine on dataService1"); @@ -207,9 +207,13 @@ // clear reference. separatorKey = null; - helper.destroy(); + client.disconnect(true/*immediateShutdown*/); + client = null; + + dataService0 = null; + dataService1 = null; - helper = null; + queryEngine = null; super.tearDown(); @@ -244,8 +248,8 @@ }; final UUID[] dataServices = new UUID[] {// - JiniUtil.serviceID2UUID(helper.dataServer0.getServiceID()),// - JiniUtil.serviceID2UUID(helper.dataServer1.getServiceID()),// + dataService0.getServiceUUID(),// + dataService1.getServiceUUID(),// }; /* @@ -253,14 +257,19 @@ * using the given separator keys and data services. */ - final R rel = new R(helper.getFederation(), namespace, ITx.UNISOLATED, new Properties()); + final R rel = new R(client.getFederation(), namespace, ITx.UNISOLATED, new Properties()); + if(client.getFederation() + .getResourceLocator().locate(namespace, ITx.UNISOLATED)==null) { + rel.create(separatorKeys, dataServices); /* * Insert data into the appropriate index partitions. */ rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); + + } } @@ -302,18 +311,12 @@ public void test_query_startRun() throws Exception { final int startId = 1; - final BindingSetPipelineOp query = new CopyBindingSetOp(new BOp[] {}, NV + final BindingSetPipelineOp query = new StartOp(new BOp[] {}, NV .asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, startId),// // new NV(Predicate.Annotations.READ_TIMESTAMP, ITx.READ_COMMITTED),// - })){ - private static final long serialVersionUID = 1L; + })); - public BOpEvaluationContext getEvaluationContext() { - return BOpEvaluationContext.CONTROLLER; - } - }; - final long queryId = 1L; final RunningQuery runningQuery = queryEngine.eval(queryId, query); @@ -379,7 +382,7 @@ final BindingSetPipelineOp query = new SliceOp(new BOp[]{new PipelineJoin<E>( // left - new CopyBindingSetOp(new BOp[] {}, NV.asMap(new NV[] {// + new StartOp(new BOp[] {}, NV.asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, startId),// })), // right @@ -563,7 +566,7 @@ final int predId2 = 5; final int sliceId = 6; - final BindingSetPipelineOp startOp = new CopyBindingSetOp(new BOp[] {}, + final BindingSetPipelineOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, startId),// })); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -46,6 +46,7 @@ import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IQueryDecl; +import com.bigdata.bop.engine.RunningQuery; import com.bigdata.bop.engine.StartOpMessage; import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.relation.accesspath.BlockingBuffer; @@ -255,6 +256,12 @@ throws RemoteException { return null; } + + @Override + public RunningQuery eval(long queryId, BindingSetPipelineOp query) + throws Exception, RemoteException { + return null; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-14 20:46:47 UTC (rev 3553) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-14 20:48:21 UTC (rev 3554) @@ -43,6 +43,7 @@ import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IQueryDecl; +import com.bigdata.bop.engine.RunningQuery; import com.bigdata.bop.engine.StartOpMessage; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -180,6 +181,12 @@ return null; } + @Override + public RunningQuery eval(long queryId, BindingSetPipelineOp query) + throws Exception, RemoteException { + return null; + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |