You can subscribe to this list here.
2006 |
Jan
|
Feb
|
Mar
(414) |
Apr
(123) |
May
(448) |
Jun
(180) |
Jul
(17) |
Aug
(49) |
Sep
(3) |
Oct
(92) |
Nov
(101) |
Dec
(64) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2007 |
Jan
(132) |
Feb
(230) |
Mar
(146) |
Apr
(146) |
May
|
Jun
|
Jul
(34) |
Aug
(4) |
Sep
(3) |
Oct
(10) |
Nov
(12) |
Dec
(24) |
2008 |
Jan
(6) |
Feb
|
Mar
|
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(11) |
Nov
(4) |
Dec
|
2009 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
From: Bryan T. <tho...@us...> - 2007-04-23 18:58:41
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv2382/src/test/com/bigdata/service Modified Files: TestMetadataServer0.java Log Message: Refactored the MetadataIndex to extend UnisolatedBTree and put it to some use in the MetadataService and validated aspects of its use in the test case for the metadata service. Index: TestMetadataServer0.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestMetadataServer0.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestMetadataServer0.java 23 Apr 2007 17:22:16 -0000 1.2 --- TestMetadataServer0.java 23 Apr 2007 18:58:38 -0000 1.3 *************** *** 48,59 **** package com.bigdata.service; - import java.rmi.RemoteException; import java.util.UUID; import java.util.concurrent.ExecutionException; - import com.bigdata.scaleup.PartitionMetadata; - import net.jini.core.lookup.ServiceID; /** * Test ability to launch, register, discover and use a {@link MetadataService} --- 48,59 ---- package com.bigdata.service; import java.util.UUID; import java.util.concurrent.ExecutionException; import net.jini.core.lookup.ServiceID; + import com.bigdata.io.SerializerUtil; + import com.bigdata.scaleup.IPartitionMetadata; + /** * Test ability to launch, register, discover and use a {@link MetadataService} *************** *** 297,304 **** ServiceID dataService0ID = getServiceID(dataServer0); - // lookup proxy for dataService0 - final IDataService dataService0Proxy = lookupDataService(dataService0ID); - try { /* * This should fail since the index was never registered. --- 297,305 ---- ServiceID dataService0ID = getServiceID(dataServer0); try { + + // lookup proxy for dataService0 + final IDataService dataService0Proxy = lookupDataService(dataService0ID); + /* * This should fail since the index was never registered. *************** *** 330,346 **** /* ! * @todo request the partition for the scale-out index, figure out the ! * data service for that partition, and make sure that an index was ! * created on that data service for the partition. */ ! // @todo encapsulate in method to generate metadata index name. ! byte[] val = metadataServiceProxy.lookup(IDataService.UNISOLATED, ! MetadataService.getMetadataName(indexName), new byte[] {}); ! ! dataService0Proxy.rangeCount(IDataService.UNISOLATED, indexName, null, ! null); } ! } --- 331,373 ---- /* ! * Request the index partition metadata for the initial partition of the ! * scale-out index. */ ! IPartitionMetadata pmd; ! { ! ! byte[] val = metadataServiceProxy.lookup(IDataService.UNISOLATED, ! MetadataService.getMetadataName(indexName), new byte[] {}); ! ! assertNotNull(val); ! ! pmd = (IPartitionMetadata) SerializerUtil.deserialize(val); ! ! } + /* + * Resolve the data service to which the initial index partition was + * mapped and verify that we can invoke an operation on that index on + * that data service (i.e., that the data service recognizes that it + * has an index registered by that name). + */ + { + + ServiceID serviceID = MetadataServer.uuid2ServiceID(pmd.getDataServices()[0]); + + // @todo use lookup cache in a real client. + IDataService proxy = lookupDataService(serviceID); + + /* + * Note: this will throw an exception if the index is not registered + * with this data service. + */ + assertEquals("rangeCount", 0, proxy.rangeCount( + IDataService.UNISOLATED, indexName, null, null)); + + } + } ! } |
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv21228/src/java/com/bigdata/service Modified Files: AbstractServer.java IDataService.java MetadataService.java DataService.java MetadataServer.java IMetadataService.java Log Message: Working on registration of scale-out index with the metadata service - commit prior to refactor to use versioning in the MetadataIndex class. Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** DataService.java 21 Apr 2007 10:37:16 -0000 1.10 --- DataService.java 23 Apr 2007 17:22:14 -0000 1.11 *************** *** 58,61 **** --- 58,63 ---- import java.util.concurrent.Executors; + import org.apache.log4j.Logger; + import com.bigdata.btree.BatchContains; import com.bigdata.btree.BatchInsert; *************** *** 75,78 **** --- 77,81 ---- import com.bigdata.journal.Journal; import com.bigdata.util.concurrent.DaemonThreadFactory; + import com.sun.corba.se.impl.orbutil.closure.Future; /** *************** *** 102,105 **** --- 105,111 ---- * an NIO interface to the data service. * + * @todo make sure that all service methods that create a {@link Future} do a + * get() so that the will block until the serialized task actually runs. + * * @todo Note that "auto-commit" is provided for unisolated writes. This relies * on two things. First, only the {@link UnisolatedBTree} recoverable from *************** *** 188,191 **** --- 194,200 ---- protected Journal journal; + + public static final transient Logger log = Logger + .getLogger(DataService.class); /** *************** *** 370,382 **** } ! public void registerIndex(String name,UUID indexUUID) { ! journal.serialize(new RegisterIndexTask(name,indexUUID)); } ! public void dropIndex(String name) { ! journal.serialize(new DropIndexTask(name)); } --- 379,400 ---- } ! public void registerIndex(String name,UUID indexUUID) throws IOException, InterruptedException, ExecutionException{ ! journal.serialize(new RegisterIndexTask(name,indexUUID)).get(); } ! public void dropIndex(String name) throws IOException, InterruptedException, ExecutionException { ! journal.serialize(new DropIndexTask(name)).get(); ! ! } ! ! public byte[] lookup(long tx, String name, byte[] key) throws IOException, ! InterruptedException, ExecutionException { ! ! byte[][] vals = batchLookup(tx, name, 1, new byte[][]{key}); ! ! return vals[0]; } *************** *** 632,636 **** * @version $Id$ */ ! private abstract class AbstractIndexManagementTask implements Callable<Object> { protected final String name; --- 650,654 ---- * @version $Id$ */ ! protected abstract class AbstractIndexManagementTask implements Callable<Object> { protected final String name; *************** *** 646,650 **** } ! private class RegisterIndexTask extends AbstractIndexManagementTask { protected final UUID indexUUID; --- 664,668 ---- } ! protected class RegisterIndexTask extends AbstractIndexManagementTask { protected final UUID indexUUID; *************** *** 677,683 **** } ! ndx = journal.registerIndex(name, new UnisolatedBTree(journal, indexUUID)); journal.commit(); return ndx; --- 695,704 ---- } ! ndx = journal.registerIndex(name, new UnisolatedBTree(journal, ! indexUUID)); journal.commit(); + + log.info("registeredIndex: "+name+", indexUUID="+indexUUID); return ndx; *************** *** 687,691 **** } ! private class DropIndexTask extends AbstractIndexManagementTask { public DropIndexTask(String name) { --- 708,712 ---- } ! protected class DropIndexTask extends AbstractIndexManagementTask { public DropIndexTask(String name) { *************** *** 715,719 **** * @version $Id$ */ ! private abstract class AbstractBatchTask implements Callable<Object> { private final String name; --- 736,740 ---- * @version $Id$ */ ! protected abstract class AbstractBatchTask implements Callable<Object> { private final String name; *************** *** 788,792 **** * to block. */ ! private class TxBatchTask extends AbstractBatchTask { private final ITx tx; --- 809,813 ---- * to block. */ ! protected class TxBatchTask extends AbstractBatchTask { private final ITx tx; *************** *** 828,832 **** * @version $Id$ */ ! private class UnisolatedReadBatchTask extends AbstractBatchTask { public UnisolatedReadBatchTask(String name, IBatchOp op) { --- 849,853 ---- * @version $Id$ */ ! protected class UnisolatedReadBatchTask extends AbstractBatchTask { public UnisolatedReadBatchTask(String name, IBatchOp op) { *************** *** 854,858 **** * @version $Id$ */ ! private class UnisolatedBatchReadWriteTask extends UnisolatedReadBatchTask { public UnisolatedBatchReadWriteTask(String name, IBatchOp op) { --- 875,879 ---- * @version $Id$ */ ! protected class UnisolatedBatchReadWriteTask extends UnisolatedReadBatchTask { public UnisolatedBatchReadWriteTask(String name, IBatchOp op) { *************** *** 889,893 **** } ! private class RangeCountTask implements Callable<Object> { // startTime or 0L iff unisolated. --- 910,914 ---- } ! protected class RangeCountTask implements Callable<Object> { // startTime or 0L iff unisolated. *************** *** 970,974 **** } ! private class RangeQueryTask implements Callable<Object> { // startTime or 0L iff unisolated. --- 991,995 ---- } ! protected class RangeQueryTask implements Callable<Object> { // startTime or 0L iff unisolated. *************** *** 1078,1082 **** * @version $Id$ */ ! private abstract class AbstractProcedureTask implements Callable<Object> { protected final IProcedure proc; --- 1099,1103 ---- * @version $Id$ */ ! protected abstract class AbstractProcedureTask implements Callable<Object> { protected final IProcedure proc; *************** *** 1113,1117 **** * to block. */ ! private class TxProcedureTask extends AbstractProcedureTask { private final ITx tx; --- 1134,1138 ---- * to block. */ ! protected class TxProcedureTask extends AbstractProcedureTask { private final ITx tx; *************** *** 1153,1157 **** * @version $Id$ */ ! private class UnisolatedReadProcedureTask extends AbstractProcedureTask { public UnisolatedReadProcedureTask(IProcedure proc) { --- 1174,1178 ---- * @version $Id$ */ ! protected class UnisolatedReadProcedureTask extends AbstractProcedureTask { public UnisolatedReadProcedureTask(IProcedure proc) { *************** *** 1179,1183 **** * @version $Id$ */ ! private class UnisolatedReadWriteProcedureTask extends UnisolatedReadProcedureTask { public UnisolatedReadWriteProcedureTask(IProcedure proc) { --- 1200,1204 ---- * @version $Id$ */ ! protected class UnisolatedReadWriteProcedureTask extends UnisolatedReadProcedureTask { public UnisolatedReadWriteProcedureTask(IProcedure proc) { Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** IDataService.java 21 Apr 2007 10:37:16 -0000 1.10 --- IDataService.java 23 Apr 2007 17:22:14 -0000 1.11 *************** *** 143,147 **** * are mapped onto the {@link DataService}. */ ! public void registerIndex(String name,UUID uuid) throws IOException; /** --- 143,148 ---- * are mapped onto the {@link DataService}. */ ! public void registerIndex(String name, UUID uuid) throws IOException, ! InterruptedException, ExecutionException; /** *************** *** 154,159 **** * if <i>name</i> does not identify a registered index. */ ! public void dropIndex(String name) throws IOException; ! /** * <p> --- 155,179 ---- * if <i>name</i> does not identify a registered index. */ ! public void dropIndex(String name) throws IOException, ! InterruptedException, ExecutionException; ! ! /** ! * Point lookup. ! * ! * @param tx ! * @param name ! * @param key ! * @return The value for that key (may be null) and <code>null</code> if ! * there is no value for that key. ! * @throws IOException ! * @throws InterruptedException ! * @throws ExecutionException ! * ! * @todo consider simply encapsulating in the client library since this just ! * wraps a batchLookup operation. ! */ ! public byte[] lookup(long tx, String name, byte[] key) throws IOException, ! InterruptedException, ExecutionException; ! /** * <p> Index: MetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataService.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** MetadataService.java 23 Apr 2007 13:09:29 -0000 1.5 --- MetadataService.java 23 Apr 2007 17:22:14 -0000 1.6 *************** *** 48,57 **** package com.bigdata.service; ! import java.net.InetSocketAddress; import java.util.Properties; import com.bigdata.journal.Journal; import com.bigdata.scaleup.MasterJournal; import com.bigdata.scaleup.MetadataIndex; /** --- 48,63 ---- package com.bigdata.service; ! import java.io.IOException; import java.util.Properties; + import java.util.UUID; + import java.util.concurrent.ExecutionException; + + import net.jini.core.lookup.ServiceID; + import com.bigdata.btree.BTree; import com.bigdata.journal.Journal; import com.bigdata.scaleup.MasterJournal; import com.bigdata.scaleup.MetadataIndex; + import com.bigdata.scaleup.PartitionMetadata; /** *************** *** 78,101 **** * be able to access the historical state of the metadata index that * corresponds to the commit time of interest for the database.) */ ! public class MetadataService extends DataService implements IMetadataService, IServiceShutdown { /** ! * The name of the journal on which the metadata index is stored. * ! * @todo support two-tier metadata index and reconcile with ! * {@link MetadataIndex} and {@link MasterJournal}. */ ! protected Journal journal; ! public MetadataService(Properties properties) { - super(properties); - } ! public InetSocketAddress getDataService(String name,byte[] key) { ! // TODO Auto-generated method stub ! return null; } --- 84,242 ---- * be able to access the historical state of the metadata index that * corresponds to the commit time of interest for the database.) + * + * @todo support two-tier metadata index and reconcile with + * {@link MetadataIndex} and {@link MasterJournal}. */ ! abstract public class MetadataService extends DataService implements ! IMetadataService, IServiceShutdown { ! ! protected MetadataService(Properties properties) { ! ! super(properties); ! ! } ! ! /* ! * @todo Support creation and management of scale-out indices, including ! * mapping their index partitions to data services. Build out this ! * functionality with a series of test cases that invoke the basic ! * operations (registerIndex, getPartition, getPartitions, movePartition, ! * etc.) and handle the load-balancing later. ! */ /** ! * @todo if if exits already? * ! * @todo index metadata options (unicode support, per-partition counters, ! * etc.) i had been passing in the BTree instance, but that does ! * not work as well in a distributed environment. ! * ! * @todo refactor so that the {@link MetadataIndex} can be used with a ! * normal {@link Journal} ! * ! * @todo create the initial partition and assign to the "least used" data ! * service (the data service impl needs to aggregate events and log ! * them in a manner that gets noticed by the metadata service). */ ! public UUID registerIndex(String name) throws IOException, InterruptedException, ExecutionException { ! ! MetadataIndex mdi = (MetadataIndex) journal.serialize( ! new RegisterMetadataIndexTask(name)).get(); ! ! UUID managedIndexUUID = mdi.getManagedIndexUUID(); ! ! return managedIndexUUID; ! ! } ! /** ! * Return the name of the metadata index. ! * ! * @param indexName ! * The name of the scale-out index. ! * ! * @return The name of the corresponding {@link MetadataIndex} that is used ! * to manage the partitions in the named scale-out index. ! */ ! public static String getMetadataName(String indexName) { ! ! return "metadata-"+indexName; } ! /** ! * Registers a metadata index for a named scale-out index and creates the ! * initial partition for the scale-out index on a {@link DataService}. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! protected class RegisterMetadataIndexTask extends AbstractIndexManagementTask { ! ! public RegisterMetadataIndexTask(String name) { ! ! super(name); ! ! } ! ! public Object call() throws Exception { ! ! // the name of the metadata index itself. ! final String metadataName = getMetadataName(name); ! ! // make sure there is no metadata index for that btree. ! if( journal.getIndex(metadataName) != null ) { ! ! throw new IllegalStateException("Already registered: name=" ! + name); ! ! } ! ! /* ! * Note: there are two UUIDs here - the UUID for the metadata index ! * describing the partitions of the named scale-out index and the UUID ! * of the named scale-out index. The metadata index UUID MUST be used by ! * all B+Tree objects having data for the metadata index (its mutable ! * btrees on journals and its index segments) while the managed named ! * index UUID MUST be used by all B+Tree objects having data for the ! * named index (its mutable btrees on journals and its index segments). ! */ ! ! final UUID metadataIndexUUID = UUID.randomUUID(); ! ! final UUID managedIndexUUID = UUID.randomUUID(); ! ! MetadataIndex mdi = new MetadataIndex(journal, ! BTree.DEFAULT_BRANCHING_FACTOR, metadataIndexUUID, ! managedIndexUUID, name); ! ! /* ! * Setup the initial partition which is able to accept any key. ! */ ! ! ServiceID dataServiceID = getUnderUtilizedDataService(); ! ! UUID dataServiceUUID = new UUID(dataServiceID ! .getMostSignificantBits(), dataServiceID ! .getLeastSignificantBits()); ! ! final UUID[] dataServices = new UUID[]{ ! dataServiceUUID ! }; ! ! mdi.put(new byte[]{}, new PartitionMetadata(0, dataServices )); ! ! journal.commit(); ! ! /* ! * Create the initial partition of the scale-out index on the ! * selected data service. ! * ! * FIXME This must be done using a restart-safe operation such that ! * the partition is either eventually created or the operation is ! * retracted and the partition is created on a different data ! * service. Note that this is a high-latency remote operation and ! * MUST NOT be run inside of the serialized write on the metadata ! * index itself. It is a good question exactly when this operation ! * should be run.... ! */ ! ! IDataService dataService = getDataServiceByID(dataServiceID); ! ! if(dataService==null) { ! ! throw new RuntimeException("Condition is not supported"); ! ! } ! ! /* ! * Register the index on the target data service (remote operation). ! */ ! dataService.registerIndex(name, managedIndexUUID); ! ! return mdi; ! ! } ! } Index: MetadataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataServer.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** MetadataServer.java 23 Apr 2007 13:09:29 -0000 1.3 --- MetadataServer.java 23 Apr 2007 17:22:14 -0000 1.4 *************** *** 94,103 **** private ServiceDiscoveryManager serviceDiscoveryManager = null; ! private LookupCache lookupCache = null; private Map<ServiceID, ServiceItem> serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItem>(); ! protected LookupCache getLookupCache() { ! return lookupCache; } --- 94,103 ---- private ServiceDiscoveryManager serviceDiscoveryManager = null; ! private LookupCache dataServiceLookupCache = null; private Map<ServiceID, ServiceItem> serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItem>(); ! protected LookupCache getDataServiceLookupCache() { ! return dataServiceLookupCache; } *************** *** 128,132 **** /* ! * Setup a lookupCache that will be populated with all services that match a * filter. This is used to keep track of all data services registered * with any service registrar to which the metadata server is listening. --- 128,132 ---- /* ! * Setup a dataServiceLookupCache that will be populated with all services that match a * filter. This is used to keep track of all data services registered * with any service registrar to which the metadata server is listening. *************** *** 137,141 **** new Class[] { IDataService.class }, null); ! lookupCache = serviceDiscoveryManager .createLookupCache(template, new DataServiceFilter() /* filter */, this /* ServiceDiscoveryListener */); --- 137,141 ---- new Class[] { IDataService.class }, null); ! dataServiceLookupCache = serviceDiscoveryManager .createLookupCache(template, new DataServiceFilter() /* filter */, this /* ServiceDiscoveryListener */); *************** *** 143,147 **** } catch(RemoteException ex) { ! log.error("Could not setup lookup lookupCache", ex); System.exit(1); --- 143,147 ---- } catch(RemoteException ex) { ! log.error("Could not setup lookup dataServiceLookupCache", ex); System.exit(1); *************** *** 285,532 **** } - // /* - // * DiscoveryListener - // */ - // - // /** - // * Extends base implementation to register for notification of - // * {@link DataService} join/leave events - // * - // * @todo this must be a low-latency handler. - // * - // * @todo pay attention iff registrar is for a group that is used by this - // * metadata service as configured. - // * - // * @todo figure out delta in registrars and in group-to-registrar mapping. - // * - // * @todo register for join/leave events for {@link IDataService} on each new - // * registrar. - // * - // * @todo task a worker to query the new registrar(s) for any existing - // * {@link IDataService}s and continue to query until all such - // * services on the registrar have been discovered. This is the pool of - // * {@link DataService}s that are available to the - // * {@link MetadataService} for management. - // */ - // public void discovered(DiscoveryEvent e) { - // - // super.discovered(e); - // - // ServiceRegistrar[] registrars = e.getRegistrars(); - // - // Map<Integer,String[]> registrarToGroups = (Map<Integer,String[]>) e.getGroups(); - // - // final int nregistrars = registrars.length; - // - // log.info("Reported: "+nregistrars+" registrars"); - // - // ServiceTemplate template = new ServiceTemplate(null, - // new Class[] { IDataService.class }, null); - // - // final long leaseDuration = 5000; - // - // final MarshalledObject handbackObject; - // - // try { - // - // handbackObject = new MarshalledObject("handback"); - // - // } catch (IOException ex) { - // - // // the string must be serializable.... - // throw new AssertionError(ex); - // - // } - // - // for(int i=0; i<nregistrars; i++) { - // - // ServiceRegistrar registrar = registrars[i]; - // - // ServiceMatches matches = getDataServices(registrar); - // - // log.info("Reported: "+matches.totalMatches+" data services on registrar"); - // - // /* - // * Note: This is a persistent registration. - // * - // * @todo share a single remote listener object for all registered - // * events? - // * - // * @todo match all appropriate transitions. - // * - // * @todo explore uses for the handback object - should there be one - // * per registrar? it can serve as a key that identifies the - // * registrar.... - // * - // * @todo the last argument is a lease duration. we will have to - // * renew the lease. see what management classes exist to make this - // * easier ( LeaseRenewalManager ). - // */ - // try { - // - // EventRegistration reg = registrar.notify(template, - // ServiceRegistrar.TRANSITION_NOMATCH_MATCH // created. - //// |ServiceRegistrar.TRANSITION_MATCH_MATCH // modified. - // |ServiceRegistrar.TRANSITION_MATCH_NOMATCH // deleted. - // , new NotifyListener(), handbackObject, leaseDuration); - // - // System.err.println("EventRegistration: "+reg); - // - // } catch (RemoteException ex) { - // - // log.error("Could not register for notification", ex); - // - // } - // - // } - // - // } - // - // /** - // * @todo When a registrar is discarded, do we have to do anything? If the - // * registrar was providing redundency, then we can still reach the - // * various data services. If the registrar was the sole resolver for - // * some services then those services are no longer available -- and - // * probably an WARNing should be logged. - // */ - // public void discarded(DiscoveryEvent e) { - // - // super.discarded(e); - // - // } - // - // /** - // * RemoteEventListener - events are generated by our registered persistent - // * notification with one or more service registrars that we use to listen - // * for join/leave of data services. - // * <p> - // * This class extends {@link UnicastRemoteObject} so that it can run in the - // * {@link ServiceRegistrar}. The class MUST be located in the unpacked JAR - // * identified by the <em>codebase</em>. - // * - // * @todo I have not figured out yet how to get the events back to the - // * {@link MetadataServer} - they are being written in the console in - // * which jini is running since they are received remotely and then - // * need to be passed back to the {@link MetadataServer} somehow. - // * - // * @todo perhaps pass in the {@link ServiceRegistrar} or the - // * {@link MarshalledObject} so that we can identify the service for - // * which the event was generated and pass the event to code on the - // * {@link MetadataServer} instance (transient reference?) that will - // * actually handle the event (notice the join/leave of a data - // * service). - // * - // * @see http://archives.java.sun.com/cgi-bin/wa?A2=ind0410&L=jini-users&D=0&P=30410 - // * - // * @see http://archives.java.sun.com/cgi-bin/wa?A2=ind0410&L=jini-users&D=0&P=29391 - // * - // * @see ServiceDiscoveryManager which can encapsulate the entire problem of - // * listening and also enumerating the existing services. However, its - // * use is limited by NAT (it will not cross a filewall). - // */ - // public static class NotifyListener extends UnicastRemoteObject implements RemoteEventListener { - // - // /** - // * - // */ - // private static final long serialVersionUID = -5847172051441883860L; - // - // public NotifyListener() throws RemoteException { - // super(); - // } - // - // public NotifyListener(int port) throws RemoteException { - // super(port); - // } - // - // /** - // * - // * @param e - // * @throws UnknownEventException - // * @throws RemoteException - // */ - // public void notify(RemoteEvent e) throws UnknownEventException, RemoteException { - // - // System.err.println("notify(RemoveEvent:"+e+")"); - // log.info(e.toString()); - // - // } - // - // } - - // /** - // * Return an {@link IMetadataService}. - // * - // * @param registrar - // * A service registrar to query. - // * - // * @return An {@link IMetadataService} if one was found using that - // * registrar. - // */ - // public IMetadataService getMetadataService(ServiceRegistrar registrar) { - // - // Class[] classes = new Class[] {IMetadataService.class}; - // - // ServiceTemplate template = new ServiceTemplate(null, classes, null); - // - // IMetadataService proxy = null; - // - // try { - // - // proxy = (IMetadataService) registrar.lookup(template); - // - // } catch(java.rmi.RemoteException e) { - // - // log.warn(e); - // - // } - // - // return proxy; - // - // } - // - // /** - // * Return the data service(s) matched on this registrar. - // * - // * @param registrar - // * The {@link ServiceRegistrar} to be queried. - // * - // * @return The data service or <code>null</code> if none was matched. - // * - // * @todo we need to describe the services to be discovered by their primary - // * interface and only search within a designated group that - // * corresponds to the bigdata federation of interest - that group is - // * part of the client configuration. - // * - // * @todo we need to filter out matches on {@link MetadataService} since it - // * extends {@link DataService}. - // * - // * @todo how do we ensure that we have seen all data services? If we query - // * each registrar as it is discovered and then register for updates - // * there are two ways in which we could miss some instances: (1) new - // * data services register between the query and the registration for - // * updates; and (2) the query will not return _ALL_ data services - // * registered, but only as match as the match limit. - // */ - // public ServiceMatches getDataServices(ServiceRegistrar registrar) { - // - // Class[] classes = new Class[] {IDataService.class}; - // - // ServiceTemplate template = new ServiceTemplate(null, classes, null); - // - // try { - // - // return registrar.lookup(template,0); - // - // } catch(java.rmi.RemoteException e) { - // - // log.warn(e); - // - // return null; - // - // } - // - // } - // /** * Extends the behavior to terminate {@link LookupCache} and --- 285,288 ---- *************** *** 535,539 **** public void destroy() { ! lookupCache.terminate(); serviceDiscoveryManager.terminate(); --- 291,295 ---- public void destroy() { ! dataServiceLookupCache.terminate(); serviceDiscoveryManager.terminate(); *************** *** 552,561 **** implements Remote, RemoteAdministrable, RemoteDestroyAdmin { ! protected AbstractServer server; /** * @param properties */ ! public AdministrableMetadataService(AbstractServer server, Properties properties) { super(properties); --- 308,317 ---- implements Remote, RemoteAdministrable, RemoteDestroyAdmin { ! protected MetadataServer server; /** * @param properties */ ! public AdministrableMetadataService(MetadataServer server, Properties properties) { super(properties); *************** *** 573,576 **** --- 329,354 ---- } + /** + * Return the UUID of an under utilized data service. + * + * @todo this is just an arbitrary instance and does not consider + * utilization. + */ + public ServiceID getUnderUtilizedDataService() throws IOException { + + ServiceItem item = server.dataServiceLookupCache.lookup(null); + + log.info(item.toString()); + + return item.serviceID; + + } + + public IDataService getDataServiceByID(ServiceID serviceID) throws IOException { + + return (IDataService)server.getDataServiceByID(serviceID).service; + + } + /* * DestroyAdmin Index: AbstractServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractServer.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** AbstractServer.java 23 Apr 2007 13:09:29 -0000 1.7 --- AbstractServer.java 23 Apr 2007 17:22:13 -0000 1.8 *************** *** 207,215 **** --- 207,219 ---- protected DiscoveryManagement getDiscoveryManagement() { + return discoveryManager; + } protected JoinManager getJoinManager() { + return joinManager; + } *************** *** 519,544 **** public void notify(LeaseRenewalEvent event) { ! log.error("Lease could not be renewed: " + event); } - // /* - // * DiscoveryListener - // */ - // - // /** - // * NOP. - // */ - // public void discovered(DiscoveryEvent arg0) { - // log.info("DiscoveryListener.discovered: "+arg0); - // } - // - // /** - // * NOP. - // */ - // public void discarded(DiscoveryEvent arg0) { - // log.info("DiscoveryListener.discarded: "+arg0); - // } - /** * Shutdown the server taking time only to unregister it from jini. --- 523,530 ---- public void notify(LeaseRenewalEvent event) { ! log.warn("Lease could not be renewed: " + event); } /** * Shutdown the server taking time only to unregister it from jini. *************** *** 794,836 **** abstract protected Remote newService(Properties properties); - // /** - // * The remote service implementation object. This implements the - // * {@link Remote} interface and uses JERI to create a proxy for the remote - // * object and configure and manage the protocol for communications between - // * the client (service proxy) and the remote object (the service - // * implementation). - // * <p> - // * Note: You have to implement {@link JoinAdmin} in order to show up as an - // * administerable service (blue folder) in the jini Service Browser. - // * - // * @version $Id$ - // * @author <a href="mailto:tho...@us...">Bryan Thompson - // * </a> - // */ - // public static class TestServiceImpl implements ITestService - // { - // - // /** - // * Service constructor. - // * - // * @param properties - // */ - // public TestServiceImpl(Properties properties) { - // - // log.info("Created: " + this ); - // - // new Journal(properties); - // - // } - // - // public void invoke() { - // - // log.info("invoked: "+this); - // - // } - // - // } - - /* * Note: You need to extend Remote in order for these APIs to be exported! --- 780,783 ---- Index: IMetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IMetadataService.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** IMetadataService.java 23 Apr 2007 13:09:29 -0000 1.5 --- IMetadataService.java 23 Apr 2007 17:22:14 -0000 1.6 *************** *** 49,54 **** import java.io.IOException; ! import java.net.InetSocketAddress; ! import java.rmi.Remote; /** --- 49,57 ---- import java.io.IOException; ! import java.util.UUID; ! import java.util.concurrent.ExecutionException; ! ! import net.jini.core.lookup.ServiceID; ! import net.jini.core.lookup.ServiceItem; /** *************** *** 68,89 **** public interface IMetadataService extends IDataService { /** ! * Return the address of the {@link IDataService} that has current primary ! * responsibility for the index partition that includes the specified key. ! * ! * @param key ! * The key. * ! * @return The locator for the {@link IDataService} with primary ! * responsibility for the index partition in which that key would be ! * located. * ! * @todo return primary and secondary data service locators with lease. * ! * @todo return primary and secondary data service locators with lease for ! * the index partition that would contain the key plus some number of ! * index partitions surrounding that partition. */ ! public InetSocketAddress getDataService(String name, byte[] key) throws IOException; ! } --- 71,151 ---- public interface IMetadataService extends IDataService { + // /** + // * Return the partition metadata for the index partition that includes the + // * specified key. + // * + // * @param key + // * The key. + // * + // * @return The metadata index partition in which that key is or would be + // * located. + // * + // * @todo return lease for the index partition that would contain the key. + // * + // * @todo abstract away from Jini so that we can support other fabrics + // * (OSGi/SCA). + // * + // * @todo Either the client or the metadata service should support + // * pre-caching of some number of index partitions surrounding that + // * partition. + // * + // * @todo do a variant that supports a key range - this should really just be + // * the same as + // * {@link IDataService#rangeQuery(long, String, byte[], byte[], int, int)} + // * with the client addressing the metadata index rather than the data + // * index (likewise for this method as well). + // * + // * @todo update the {@link PartitionMetadata} data model to reflect a single + // * point of responsibility with a media replication chain for + // * failover. Either this method or a variant method needs to return + // * the partition metadata itself so that {@link DataService}s can + // * configure their downstream media replication pipelines. + // */ + // public PartitionMetadata getPartition(String name, byte[] key) throws IOException; + // + + /* + * methods that require access to the metadata server for their + * implementations. + */ + /** ! * Return the UUID of an under utilized data service. ! */ ! public ServiceID getUnderUtilizedDataService() throws IOException; ! ! /** ! * Return the proxy for a {@link IDataService} from the local cache. * ! * @param serviceID ! * The {@link ServiceID} for the {@link DataService}. ! * ! * @return The proxy or <code>null</code> if the {@link ServiceID} is not ! * mapped to a {@link ServiceItem} for a known {@link DataService} ! * by the local cache. ! * ! * @throws IOException ! */ ! public IDataService getDataServiceByID(ServiceID serviceID) ! throws IOException; ! ! /* ! * methods that do not require direct access to the metadata server for ! * their implementation. ! */ ! ! /** ! * Register a scale-out index. The index will automatically be assigned to a ! * {@link DataService} for its initial partition. As the index grows, the ! * initial partition will be split and the various partitions may be ! * re-distributed among the available {@link DataService}s. * ! * @param name ! * The index name. * ! * @return The UUID for that index. */ ! public UUID registerIndex(String name) throws IOException, ! InterruptedException, ExecutionException; ! } |
From: Bryan T. <tho...@us...> - 2007-04-23 17:22:21
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv21228/src/test/com/bigdata/service Modified Files: TestMetadataServer0.java Log Message: Working on registration of scale-out index with the metadata service - commit prior to refactor to use versioning in the MetadataIndex class. Index: TestMetadataServer0.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestMetadataServer0.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestMetadataServer0.java 23 Apr 2007 13:09:30 -0000 1.1 --- TestMetadataServer0.java 23 Apr 2007 17:22:16 -0000 1.2 *************** *** 48,51 **** --- 48,57 ---- package com.bigdata.service; + import java.rmi.RemoteException; + import java.util.UUID; + import java.util.concurrent.ExecutionException; + + import com.bigdata.scaleup.PartitionMetadata; + import net.jini.core.lookup.ServiceID; *************** *** 71,76 **** --- 77,92 ---- } + /** + * Starts in {@link #setUp()}. + */ MetadataServer metadataServer0; + /** + * Starts in {@link #setUp()}. + */ DataServer dataServer1; + /** + * Must be started by the test. + */ + DataServer dataServer0; /** *************** *** 125,130 **** --- 141,189 ---- dataServer1.destroy(); + if(dataServer0!=null) { + + destroyDataServer0(); + + } + + } + + /** + * Start data service 0. + */ + protected void startDataServer0() { + + assert dataServer0 == null; + + dataServer0 = new DataServer( + new String[] { "src/resources/config/standalone/DataServer0.config" }); + + new Thread() { + + public void run() { + + dataServer0.run(); + + } + + }.start(); + } + + /** + * Destroy data server 0. + */ + protected void destroyDataServer0() { + assert dataServer0 != null; + + System.err.println("Destroying DataServer0"); + + dataServer0.destroy(); + + dataServer0 = null; + + } + /** * Test the ability to discover the {@link MetadataService} and the ability *************** *** 138,148 **** public void test_serverRunning() throws Exception { ServiceID dataService1ID = getServiceID(dataServer1); ServiceID metadataServiceID = getServiceID(metadataServer0); ! final IMetadataService proxy = (IMetadataService) lookupDataService(metadataServiceID); ! assertNotNull("service not discovered", proxy); /* --- 197,210 ---- public void test_serverRunning() throws Exception { + // wait for the service to be ready. ServiceID dataService1ID = getServiceID(dataServer1); + // wait for the service to be ready. ServiceID metadataServiceID = getServiceID(metadataServer0); ! // get proxy for this metadata service. ! final IMetadataService metadataServiceProxy = (IMetadataService) lookupDataService(metadataServiceID); ! assertNotNull("service not discovered", metadataServiceProxy); /* *************** *** 150,155 **** * discover it. */ - final DataServer dataServer0 = new DataServer( - new String[] { "src/resources/config/standalone/DataServer0.config" }); ServiceID dataService0ID = null; --- 212,215 ---- *************** *** 157,169 **** try { ! new Thread() { ! ! public void run() { ! ! dataServer0.run(); ! ! } ! ! }.start(); /* --- 217,221 ---- try { ! startDataServer0(); /* *************** *** 196,202 **** * server notices this event. */ ! System.err.println("Destroying DataServer0"); ! ! dataServer0.destroy(); if (dataService0ID != null) { --- 248,252 ---- * server notices this event. */ ! destroyDataServer0(); if (dataService0ID != null) { *************** *** 220,222 **** --- 270,346 ---- } + /** + * Registers a scale-out index and pre-partitions it to have data on each + * of two {@link DataService} instances. + */ + public void test_registerScaleOutIndex() throws Exception { + + // wait for the service to be ready. + ServiceID dataService1ID = getServiceID(dataServer1); + + // wait for the service to be ready. + ServiceID metadataServiceID = getServiceID(metadataServer0); + + // get proxy for this metadata service. + final IMetadataService metadataServiceProxy = (IMetadataService) lookupDataService(metadataServiceID); + + assertNotNull("service not discovered", metadataServiceProxy); + + /* + * wait until we get the serviceID as an indication that the data + * service is running. + */ + + startDataServer0(); + + // wait for the service to be ready. + ServiceID dataService0ID = getServiceID(dataServer0); + + // lookup proxy for dataService0 + final IDataService dataService0Proxy = lookupDataService(dataService0ID); + + try { + /* + * This should fail since the index was never registered. + */ + dataService0Proxy.rangeCount(IDataService.UNISOLATED, "xyz", null, + null); + + } catch (ExecutionException ex) { + + System.err.println("cause="+ex.getCause()); + + assertTrue(ex.getCause() instanceof IllegalStateException); + + log.info("Ignoring expected exception: " + ex); + + } + + // + assertNotNull(metadataServiceProxy.getUnderUtilizedDataService()); + + /* + * register a scale-out index. + */ + final String indexName = "testIndex"; + + UUID indexUUID = metadataServiceProxy.registerIndex(indexName); + + log.info("Registered scale-out index: indexUUID="+indexUUID); + + /* + * @todo request the partition for the scale-out index, figure out the + * data service for that partition, and make sure that an index was + * created on that data service for the partition. + */ + + // @todo encapsulate in method to generate metadata index name. + byte[] val = metadataServiceProxy.lookup(IDataService.UNISOLATED, + MetadataService.getMetadataName(indexName), new byte[] {}); + + dataService0Proxy.rangeCount(IDataService.UNISOLATED, indexName, null, + null); + + } + } |
From: Bryan T. <tho...@us...> - 2007-04-23 17:22:21
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/scaleup In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv21228/src/test/com/bigdata/scaleup Modified Files: TestMetadataIndex.java Log Message: Working on registration of scale-out index with the metadata service - commit prior to refactor to use versioning in the MetadataIndex class. Index: TestMetadataIndex.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/scaleup/TestMetadataIndex.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** TestMetadataIndex.java 13 Apr 2007 15:04:24 -0000 1.15 --- TestMetadataIndex.java 23 Apr 2007 17:22:17 -0000 1.16 *************** *** 1027,1031 **** Journal store = new Journal(properties); ! final UUID[] dataServices = new UUID[]{UUID.randomUUID(),UUID.randomUUID()}; final UUID indexUUID = UUID.randomUUID(); --- 1027,1032 ---- Journal store = new Journal(properties); ! final UUID[] dataServices = new UUID[] { UUID.randomUUID(), ! UUID.randomUUID() }; final UUID indexUUID = UUID.randomUUID(); |
From: Bryan T. <tho...@us...> - 2007-04-23 17:22:20
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv21228/src/java/com/bigdata/scaleup Modified Files: PartitionMetadata.java Log Message: Working on registration of scale-out index with the metadata service - commit prior to refactor to use versioning in the MetadataIndex class. Index: PartitionMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/PartitionMetadata.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** PartitionMetadata.java 16 Apr 2007 10:02:49 -0000 1.8 --- PartitionMetadata.java 23 Apr 2007 17:22:16 -0000 1.9 *************** *** 63,68 **** * index? * - * @todo aggregate resource load statistics. - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ --- 63,66 ---- *************** *** 78,81 **** --- 76,82 ---- * The ordered list of data services on which data for this partition will * be written and from which data for this partition may be read. + * + * @todo refactor into a dataService UUID (required) and an array of zero or + * more media replication services for failover. */ final protected UUID[] dataServices; |
From: Bryan T. <tho...@us...> - 2007-04-23 17:22:20
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/btree In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv21228/src/java/com/bigdata/btree Modified Files: ReadOnlyFusedView.java Log Message: Working on registration of scale-out index with the metadata service - commit prior to refactor to use versioning in the MetadataIndex class. Index: ReadOnlyFusedView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/btree/ReadOnlyFusedView.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ReadOnlyFusedView.java 13 Apr 2007 15:04:12 -0000 1.1 --- ReadOnlyFusedView.java 23 Apr 2007 17:22:16 -0000 1.2 *************** *** 126,130 **** "source used more than once"); ! if (srcs[i].getIndexUUID().equals(srcs[j].getIndexUUID())) { throw new IllegalArgumentException( "Sources have different index UUIDs"); --- 126,130 ---- "source used more than once"); ! if (! srcs[i].getIndexUUID().equals(srcs[j].getIndexUUID())) { throw new IllegalArgumentException( "Sources have different index UUIDs"); |
From: Bryan T. <tho...@us...> - 2007-04-23 13:10:20
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv20436/src/java/com/bigdata/service Modified Files: AbstractClient.java AbstractServer.java DataServer.java MapReduceService.java IMetadataService.java MetadataService.java MetadataServer.java Added Files: ClientIndexView.java Log Message: The metadata server now tracks join/leaves of data services within a managed set of registrars and has a test case for this feature. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** DataServer.java 21 Apr 2007 10:37:16 -0000 1.6 --- DataServer.java 23 Apr 2007 13:09:29 -0000 1.7 *************** *** 55,59 **** import com.bigdata.journal.IJournal; - import com.sun.jini.start.LifeCycle; /** --- 55,58 ---- *************** *** 64,70 **** --- 63,79 ---- * passed to the {@link DataServer#DataServer(String[])} constructor or * {@link #main(String[])}. + * <p> * * @see src/resources/config for sample configurations. * + * @todo describe and implement the media replication mechanism and service + * failover. only the primary service is mutable. service instances for + * the same persistent data must determine whether or not another service + * is running (basically, can they obtain a lock to operate as the primary + * service). secondary service instances may provide redundent read-only + * operations from replicated media (or media on a distributed file + * system). the failover mechanisms are essentially the same for the data + * service and for the derived metadata service. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ *************** *** 84,92 **** } ! public DataServer(String[] args, LifeCycle lifeCycle) { ! ! super( args, lifeCycle ); ! ! } /** --- 93,101 ---- } ! // public DataServer(String[] args, LifeCycle lifeCycle) { ! // ! // super( args, lifeCycle ); ! // ! // } /** *************** *** 203,208 **** // // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 212,215 ---- *************** *** 212,217 **** // log.info(""); // - // // TODO Auto-generated method stub - // // } // --- 219,222 ---- *************** *** 219,224 **** // // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 224,227 ---- *************** *** 228,232 **** // log.info(""); // - // // TODO Auto-generated method stub // return null; // } --- 231,234 ---- *************** *** 236,240 **** // log.info(""); // - // // TODO Auto-generated method stub // return null; // } --- 238,241 ---- *************** *** 244,248 **** // log.info(""); // - // // TODO Auto-generated method stub // return null; // } --- 245,248 ---- *************** *** 251,256 **** // // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 251,254 ---- *************** *** 259,270 **** // log.info(""); // - // // TODO Auto-generated method stub - // // } // // public void removeLookupLocators(LookupLocator[] arg0) throws RemoteException { // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 257,264 ---- *************** *** 272,277 **** // public void setLookupGroups(String[] arg0) throws RemoteException { // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 266,269 ---- *************** *** 279,284 **** // public void setLookupLocators(LookupLocator[] arg0) throws RemoteException { // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 271,274 ---- Index: AbstractServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractServer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** AbstractServer.java 20 Apr 2007 16:36:27 -0000 1.6 --- AbstractServer.java 23 Apr 2007 13:09:29 -0000 1.7 *************** *** 95,99 **** * * <pre> ! * java -Djava.security.policy=policy.all -cp lib\jini-ext.jar;lib\start.jar com.sun.jini.start.ServiceStarter src/test/com/bigdata/service/TestServerStarter.config * </pre> * --- 95,99 ---- * * <pre> ! * java -Djava.security.policy=policy.all -cp lib\jini-ext.jar;lib\start.jar com.sun.jini.start.ServiceStarter src/test/com/bigdata/service/TestServerStarter.config * </pre> * *************** *** 139,144 **** * * @todo the {@link DestroyAdmin} implementation on the {@link DataServer} is ! * not working correctly. Untangle the various ways in which things can ! * be stopped vs destroyed. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 139,147 ---- * * @todo the {@link DestroyAdmin} implementation on the {@link DataServer} is ! * not working correctly. Untangle the various ways in which things can be ! * stopped vs destroyed. ! * ! * @todo document exit status codes and unify their use in this and derived ! * classes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 165,169 **** private ServiceID serviceID; private DiscoveryManagement discoveryManager; ! private JoinManager joinManager; private Configuration config; /** --- 168,172 ---- private ServiceID serviceID; private DiscoveryManagement discoveryManager; ! private JoinManager joinManager; private Configuration config; /** *************** *** 187,190 **** --- 190,201 ---- /** + * The object used to inform the hosting environment that the server is + * unregistering (terminating). A fake object is used when the server is run + * from the command line, otherwise the object is supplied by the + * {@link NonActivatableServiceDescriptor}. + */ + private LifeCycle lifeCycle; + + /** * Return the assigned {@link ServiceID}. */ *************** *** 194,205 **** } ! /** ! * The object used to inform the hosting environment that the server is ! * unregistering (terminating). A fake object is used when the server is run ! * from the command line, otherwise the object is supplied by the ! * {@link NonActivatableServiceDescriptor}. ! */ ! private LifeCycle lifeCycle; /** --- 205,216 ---- } + + protected DiscoveryManagement getDiscoveryManagement() { + return discoveryManager; + } ! protected JoinManager getJoinManager() { ! return joinManager; ! } /** *************** *** 228,232 **** * @see NonActivatableServiceDescriptor */ ! protected AbstractServer(String[] args, LifeCycle lifeCycle ) { if (lifeCycle == null) --- 239,243 ---- * @see NonActivatableServiceDescriptor */ ! private AbstractServer(String[] args, LifeCycle lifeCycle ) { if (lifeCycle == null) *************** *** 235,239 **** this.lifeCycle = lifeCycle; ! // @todo verify that this belongs here. System.setSecurityManager(new SecurityManager()); --- 246,250 ---- this.lifeCycle = lifeCycle; ! // @todo verify that this belongs here vs in a main(String[]). System.setSecurityManager(new SecurityManager()); *************** *** 330,333 **** --- 341,345 ---- open = true; + log.info("Impl is "+impl); log.info("Proxy is " + proxy + "(" + proxy.getClass() + ")"); *************** *** 356,360 **** */ discoveryManager = new LookupDiscoveryManager( ! groups, unicastLocators, null // DiscoveryListener ); --- 368,372 ---- */ discoveryManager = new LookupDiscoveryManager( ! groups, unicastLocators, null /*DiscoveryListener*/ ); *************** *** 368,372 **** joinManager = new JoinManager(proxy, // service proxy entries, // attr sets ! serviceID, // ServiceIDListener discoveryManager, // DiscoveryManager new LeaseRenewalManager()); --- 380,384 ---- joinManager = new JoinManager(proxy, // service proxy entries, // attr sets ! serviceID, // ServiceID discoveryManager, // DiscoveryManager new LeaseRenewalManager()); *************** *** 511,514 **** --- 523,544 ---- } + // /* + // * DiscoveryListener + // */ + // + // /** + // * NOP. + // */ + // public void discovered(DiscoveryEvent arg0) { + // log.info("DiscoveryListener.discovered: "+arg0); + // } + // + // /** + // * NOP. + // */ + // public void discarded(DiscoveryEvent arg0) { + // log.info("DiscoveryListener.discarded: "+arg0); + // } + /** * Shutdown the server taking time only to unregister it from jini. --- NEW FILE: ClientIndexView.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 22, 2007 */ package com.bigdata.service; import java.util.UUID; import com.bigdata.btree.BatchContains; import com.bigdata.btree.BatchInsert; import com.bigdata.btree.BatchLookup; import com.bigdata.btree.BatchRemove; import com.bigdata.btree.IEntryIterator; import com.bigdata.btree.IIndex; import com.bigdata.scaleup.PartitionedIndexView; /** * A client-side view of an index. * * @todo cache leased information about index partitions of interest to the * client. The cache will be a little tricky since we need to know when * the client does not possess a partition definition. Index partitions * are defined by the separator key - the first key that lies beyond that * partition. the danger then is that a client will presume that any key * before the first leased partition is part of that first partition. To * guard against that the client needs to know both the separator key that * represents the upper and lower bounds of each partition. If a lookup in * the cache falls outside of any known partitions upper and lower bounds * then it is a cache miss and we have to ask the metadata service for a * lease on the partition. the cache itself is just a btree data structure * with the proviso that some cache entries represent missing partition * definitions (aka the lower bounds for known partitions where the left * sibling partition is not known to the client). * * @todo support partitioned indices by resolving client operations against each * index partition as necessary and maintaining leases with the metadata * service - the necessary logic is in the {@link PartitionedIndexView} * and can be refactored for this purpose. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class ClientIndexView implements IIndex { protected final AbstractClient client; protected final String name; protected IMetadataService mdproxy; protected UUID indexUUID; // @todo final. public ClientIndexView(AbstractClient client, String name) { if(client==null) throw new IllegalArgumentException(); if(name==null) throw new IllegalArgumentException(); this.client = client; this.name = name; /* * obtain the proxy for a metadata service. if this instance fails, then * we can always ask for a new instance for the same distributed * database (failover). */ // mdproxy = client.getMetadataService(); /* * obtain the indexUUID. this is a means of verifying that the index * exists. */ // indexUUID = mdproxy.getIndexUUID(name); } public UUID getIndexUUID() { return indexUUID; } public boolean contains(byte[] key) { // TODO Auto-generated method stub return false; } public Object insert(Object key, Object value) { // TODO Auto-generated method stub return null; } public Object lookup(Object key) { // TODO Auto-generated method stub return null; } public Object remove(Object key) { // TODO Auto-generated method stub return null; } public int rangeCount(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub return 0; } public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub return null; } public void contains(BatchContains op) { // TODO Auto-generated method stub } public void insert(BatchInsert op) { // TODO Auto-generated method stub } public void lookup(BatchLookup op) { // TODO Auto-generated method stub } public void remove(BatchRemove op) { // TODO Auto-generated method stub } } Index: AbstractClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractClient.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** AbstractClient.java 27 Mar 2007 14:34:23 -0000 1.1 --- AbstractClient.java 23 Apr 2007 13:09:29 -0000 1.2 *************** *** 117,125 **** /** - * The exported proxy for the service implementation object. - */ - protected Remote proxy; - - /** * Server startup reads {@link Configuration} data from the file(s) named by * <i>args</i>, starts the service, and advertises the service for --- 117,120 ---- Index: MetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataService.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** MetadataService.java 27 Mar 2007 17:11:42 -0000 1.4 --- MetadataService.java 23 Apr 2007 13:09:29 -0000 1.5 *************** *** 79,83 **** * corresponds to the commit time of interest for the database.) */ ! public class MetadataService implements IMetadataService, IServiceShutdown { /** --- 79,83 ---- * corresponds to the commit time of interest for the database.) */ ! public class MetadataService extends DataService implements IMetadataService, IServiceShutdown { /** *************** *** 91,100 **** public MetadataService(Properties properties) { ! /* ! * @todo setup/resolve the journal and the metadata index on ! * the journal. ! */ ! ! journal = new Journal(properties); } --- 91,95 ---- public MetadataService(Properties properties) { ! super(properties); } *************** *** 105,127 **** } - public int getEntryCount(String name) { - // TODO Auto-generated method stub - return 0; - } - - public int rangeCount(String name,byte[] fromKey,byte[] toKey) { - // TODO Auto-generated method stub - return 0; - } - - public void shutdown() { - // TODO Auto-generated method stub - - } - - public void shutdownNow() { - // TODO Auto-generated method stub - - } - } --- 100,102 ---- Index: MetadataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataServer.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** MetadataServer.java 20 Apr 2007 16:36:27 -0000 1.2 --- MetadataServer.java 23 Apr 2007 13:09:29 -0000 1.3 *************** *** 48,61 **** package com.bigdata.service; import java.rmi.Remote; import java.rmi.RemoteException; import java.util.Properties; ! import net.jini.core.lookup.ServiceMatches; ! import net.jini.core.lookup.ServiceRegistrar; import net.jini.core.lookup.ServiceTemplate; ! ! import com.bigdata.journal.IJournal; ! import com.sun.jini.start.LifeCycle; /** --- 48,67 ---- package com.bigdata.service; + import java.io.IOException; import java.rmi.Remote; import java.rmi.RemoteException; + import java.util.Map; import java.util.Properties; + import java.util.concurrent.ConcurrentHashMap; ! import net.jini.core.lookup.ServiceID; ! import net.jini.core.lookup.ServiceItem; import net.jini.core.lookup.ServiceTemplate; ! import net.jini.lease.LeaseRenewalManager; ! import net.jini.lookup.LookupCache; ! import net.jini.lookup.ServiceDiscoveryEvent; ! import net.jini.lookup.ServiceDiscoveryListener; ! import net.jini.lookup.ServiceDiscoveryManager; ! import net.jini.lookup.ServiceItemFilter; /** *************** *** 70,80 **** * the same group. While running, it tracks when data services start and stop so * that it can (re-)allocate index partitions as necessary. ! * <p> ! * The metadata server uses a write through pipeline to replicate its data onto ! * registered secondary metadata servers. If the metadata server fails, clients ! * will automatically fail over to a secondary metadata server. Only the primary ! * metadata server actively tracks the state of data services since secondaries ! * are updated via the write through pipeline to ensure consistency. Secondary ! * metadata servers will notice if the primary dies and elect a new master. * * @todo note that the service update registration is _persistent_ (assuming --- 76,83 ---- * the same group. While running, it tracks when data services start and stop so * that it can (re-)allocate index partitions as necessary. ! * ! * @todo aggregate host load data and service RPC events and report them ! * periodically so that we can track load and make load balancing ! * decisions. * * @todo note that the service update registration is _persistent_ (assuming *************** *** 82,90 **** * wrinkle to how a bigdata instance must be configured. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class MetadataServer extends AbstractServer { /** * @param args --- 85,106 ---- * wrinkle to how a bigdata instance must be configured. * + * @todo should destroy destroy the service instance or the persistent state as + * well? Locally, or as replicated? + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class MetadataServer extends DataServer implements ServiceDiscoveryListener { + private ServiceDiscoveryManager serviceDiscoveryManager = null; + private LookupCache lookupCache = null; + private Map<ServiceID, ServiceItem> serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItem>(); + + protected LookupCache getLookupCache() { + + return lookupCache; + + } + /** * @param args *************** *** 94,111 **** super(args); ! } ! /** ! * @param args ! * @param lifeCycle ! */ ! public MetadataServer(String[] args, LifeCycle lifeCycle) { ! ! super(args, lifeCycle); ! ! ! } protected Remote newService(Properties properties) { --- 110,164 ---- super(args); ! /* ! * Setup a helper class that will be notified as services join or leave ! * the various registrars to which the metadata server is listening. ! */ ! try { ! serviceDiscoveryManager = new ServiceDiscoveryManager(getDiscoveryManagement(), ! new LeaseRenewalManager()); ! ! } catch(IOException ex) { ! ! log.error("Could not initiate service discovery manager", ex); ! ! System.exit(1); ! ! } ! ! /* ! * Setup a lookupCache that will be populated with all services that match a ! * filter. This is used to keep track of all data services registered ! * with any service registrar to which the metadata server is listening. ! */ ! try { ! ! ServiceTemplate template = new ServiceTemplate(null, ! new Class[] { IDataService.class }, null); ! ! lookupCache = serviceDiscoveryManager ! .createLookupCache(template, ! new DataServiceFilter() /* filter */, this /* ServiceDiscoveryListener */); ! ! } catch(RemoteException ex) { ! ! log.error("Could not setup lookup lookupCache", ex); ! ! System.exit(1); ! ! } ! } + // /** + // * @param args + // * @param lifeCycle + // */ + // public MetadataServer(String[] args, LifeCycle lifeCycle) { + // + // super(args, lifeCycle); + // + // } + protected Remote newService(Properties properties) { *************** *** 115,212 **** /** ! * Return an {@link IMetadataService}. * ! * @param registrar ! * A service registrar to query. ! * ! * @return An {@link IMetadataService} if one was found using that ! * registrar. */ ! public IMetadataService getMetadataService(ServiceRegistrar registrar) { ! Class[] classes = new Class[] {IMetadataService.class}; ! ServiceTemplate template = new ServiceTemplate(null, classes, null); ! IMetadataService proxy = null; ! try { ! proxy = (IMetadataService) registrar.lookup(template); ! ! } catch(java.rmi.RemoteException e) { ! log.warn(e); ! ! } ! return proxy; } /** ! * Return the data service(s) matched on this registrar. ! * ! * @param registrar ! * ! * @return The data service or <code>null</code> if none was matched. * ! * @todo we need to describe the services to be discovered by their primary ! * interface and only search within a designated group that ! * corresponds to the bigdata federation of interest - that group is ! * part of the client configuration. * ! * @todo how do we ensure that we have seen all data services? If we query ! * each registrar as it is discovered and then register for updates ! * there are two ways in which we could miss some instances: (1) new ! * data services register between the query and the registration for ! * updates; and (2) the query will not return _ALL_ data services ! * registered, but only as match as the match limit. */ ! public ServiceMatches getDataServices(ServiceRegistrar registrar) { ! Class[] classes = new Class[] {IDataService.class}; ! ServiceTemplate template = new ServiceTemplate(null, classes, null); ! try { - return registrar.lookup(template,0); - - } catch(java.rmi.RemoteException e) { - - log.warn(e); - - return null; - - } - } /** ! * Extends the behavior to close and delete the journal in use by the ! * metadata service. */ public void destroy() { - - MetadataService service = (MetadataService)impl; ! super.destroy(); ! try { ! ! IJournal journal = service.journal; ! ! log.info("Closing and deleting: "+journal.getFile()); ! ! journal.closeAndDelete(); ! ! log.info("Journal deleted."); ! ! } catch (Throwable t) { ! ! log.warn("Could not delete journal: " + t, t); ! ! } } --- 168,543 ---- /** ! * Filter matches a {@link DataService} but not a {@link MetadataService}. ! * <p> * ! * @todo This explicitly filters out service variants that extend ! * {@link DataService} but which are not tasked as a ! * {@link DataService} by the {@link MetadataService}. It would be ! * easier if we refactored the interface hierarchy a bit so that there ! * was a common interface and abstract class extended by both the ! * {@link DataService} and the {@link MetadataService} such that we ! * could match on their specific interfaces without the possibility of ! * confusion. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ */ ! public static class DataServiceFilter implements ServiceItemFilter { ! ! public boolean check(ServiceItem item) { ! ! if(item.service==null) { ! ! log.warn("Service is null: "+item); ! ! return false; ! ! } ! ! if(!(item.service instanceof IMetadataService)) { ! ! log.info("Matched: "+item); ! ! return true; ! ! } ! ! log.debug("Ignoring: "+item); ! ! return false; ! ! } ! }; ! /* ! * ServiceDiscoveryListener. ! */ ! ! /** ! * Adds the {@link ServiceItem} to the internal map to support {@link #getServiceByID()} ! * <p> ! * Note: This event is generated by the {@link LookupCache}. There is an ! * event for each {@link DataService} as it joins any registrar in the set ! * of registrars to which the {@link MetadataServer} is listening. The set ! * of distinct joined {@link DataService}s is accessible via the ! * {@link LookupCache}. ! */ ! public void serviceAdded(ServiceDiscoveryEvent e) { ! log.info("" + e + ", class=" ! + e.getPostEventServiceItem().toString()); ! serviceIdMap.put(e.getPostEventServiceItem().serviceID, e ! .getPostEventServiceItem()); ! } ! /** ! * NOP. ! */ ! public void serviceChanged(ServiceDiscoveryEvent e) { ! log.info(""+e+", class=" ! + e.getPostEventServiceItem().toString()); ! ! serviceIdMap.put(e.getPostEventServiceItem().serviceID, e ! .getPostEventServiceItem()); } /** ! * NOP. ! */ ! public void serviceRemoved(ServiceDiscoveryEvent e) { ! ! log.info(""+e+", class=" ! + e.getPreEventServiceItem().toString()); ! ! serviceIdMap.remove(e.getPreEventServiceItem().serviceID); ! ! } ! ! /** ! * Resolve the {@link ServiceID} for a {@link DataService} to the cached ! * {@link ServiceItem} for that {@link DataService}. * ! * @param serviceID ! * The {@link ServiceID} for the {@link DataService}. * ! * @return The cache {@link ServiceItem} for that {@link DataService}. */ ! public ServiceItem getDataServiceByID(ServiceID serviceID) { ! return serviceIdMap.get(serviceID); ! } ! ! /** ! * Return the #of {@link DataService}s known to this {@link MetadataServer}. ! * ! * @return The #of {@link DataService}s in the {@link LookupCache}. ! */ ! public int getDataServiceCount() { ! return serviceIdMap.size(); } + // /* + // * DiscoveryListener + // */ + // + // /** + // * Extends base implementation to register for notification of + // * {@link DataService} join/leave events + // * + // * @todo this must be a low-latency handler. + // * + // * @todo pay attention iff registrar is for a group that is used by this + // * metadata service as configured. + // * + // * @todo figure out delta in registrars and in group-to-registrar mapping. + // * + // * @todo register for join/leave events for {@link IDataService} on each new + // * registrar. + // * + // * @todo task a worker to query the new registrar(s) for any existing + // * {@link IDataService}s and continue to query until all such + // * services on the registrar have been discovered. This is the pool of + // * {@link DataService}s that are available to the + // * {@link MetadataService} for management. + // */ + // public void discovered(DiscoveryEvent e) { + // + // super.discovered(e); + // + // ServiceRegistrar[] registrars = e.getRegistrars(); + // + // Map<Integer,String[]> registrarToGroups = (Map<Integer,String[]>) e.getGroups(); + // + // final int nregistrars = registrars.length; + // + // log.info("Reported: "+nregistrars+" registrars"); + // + // ServiceTemplate template = new ServiceTemplate(null, + // new Class[] { IDataService.class }, null); + // + // final long leaseDuration = 5000; + // + // final MarshalledObject handbackObject; + // + // try { + // + // handbackObject = new MarshalledObject("handback"); + // + // } catch (IOException ex) { + // + // // the string must be serializable.... + // throw new AssertionError(ex); + // + // } + // + // for(int i=0; i<nregistrars; i++) { + // + // ServiceRegistrar registrar = registrars[i]; + // + // ServiceMatches matches = getDataServices(registrar); + // + // log.info("Reported: "+matches.totalMatches+" data services on registrar"); + // + // /* + // * Note: This is a persistent registration. + // * + // * @todo share a single remote listener object for all registered + // * events? + // * + // * @todo match all appropriate transitions. + // * + // * @todo explore uses for the handback object - should there be one + // * per registrar? it can serve as a key that identifies the + // * registrar.... + // * + // * @todo the last argument is a lease duration. we will have to + // * renew the lease. see what management classes exist to make this + // * easier ( LeaseRenewalManager ). + // */ + // try { + // + // EventRegistration reg = registrar.notify(template, + // ServiceRegistrar.TRANSITION_NOMATCH_MATCH // created. + //// |ServiceRegistrar.TRANSITION_MATCH_MATCH // modified. + // |ServiceRegistrar.TRANSITION_MATCH_NOMATCH // deleted. + // , new NotifyListener(), handbackObject, leaseDuration); + // + // System.err.println("EventRegistration: "+reg); + // + // } catch (RemoteException ex) { + // + // log.error("Could not register for notification", ex); + // + // } + // + // } + // + // } + // + // /** + // * @todo When a registrar is discarded, do we have to do anything? If the + // * registrar was providing redundency, then we can still reach the + // * various data services. If the registrar was the sole resolver for + // * some services then those services are no longer available -- and + // * probably an WARNing should be logged. + // */ + // public void discarded(DiscoveryEvent e) { + // + // super.discarded(e); + // + // } + // + // /** + // * RemoteEventListener - events are generated by our registered persistent + // * notification with one or more service registrars that we use to listen + // * for join/leave of data services. + // * <p> + // * This class extends {@link UnicastRemoteObject} so that it can run in the + // * {@link ServiceRegistrar}. The class MUST be located in the unpacked JAR + // * identified by the <em>codebase</em>. + // * + // * @todo I have not figured out yet how to get the events back to the + // * {@link MetadataServer} - they are being written in the console in + // * which jini is running since they are received remotely and then + // * need to be passed back to the {@link MetadataServer} somehow. + // * + // * @todo perhaps pass in the {@link ServiceRegistrar} or the + // * {@link MarshalledObject} so that we can identify the service for + // * which the event was generated and pass the event to code on the + // * {@link MetadataServer} instance (transient reference?) that will + // * actually handle the event (notice the join/leave of a data + // * service). + // * + // * @see http://archives.java.sun.com/cgi-bin/wa?A2=ind0410&L=jini-users&D=0&P=30410 + // * + // * @see http://archives.java.sun.com/cgi-bin/wa?A2=ind0410&L=jini-users&D=0&P=29391 + // * + // * @see ServiceDiscoveryManager which can encapsulate the entire problem of + // * listening and also enumerating the existing services. However, its + // * use is limited by NAT (it will not cross a filewall). + // */ + // public static class NotifyListener extends UnicastRemoteObject implements RemoteEventListener { + // + // /** + // * + // */ + // private static final long serialVersionUID = -5847172051441883860L; + // + // public NotifyListener() throws RemoteException { + // super(); + // } + // + // public NotifyListener(int port) throws RemoteException { + // super(port); + // } + // + // /** + // * + // * @param e + // * @throws UnknownEventException + // * @throws RemoteException + // */ + // public void notify(RemoteEvent e) throws UnknownEventException, RemoteException { + // + // System.err.println("notify(RemoveEvent:"+e+")"); + // log.info(e.toString()); + // + // } + // + // } + + // /** + // * Return an {@link IMetadataService}. + // * + // * @param registrar + // * A service registrar to query. + // * + // * @return An {@link IMetadataService} if one was found using that + // * registrar. + // */ + // public IMetadataService getMetadataService(ServiceRegistrar registrar) { + // + // Class[] classes = new Class[] {IMetadataService.class}; + // + // ServiceTemplate template = new ServiceTemplate(null, classes, null); + // + // IMetadataService proxy = null; + // + // try { + // + // proxy = (IMetadataService) registrar.lookup(template); + // + // } catch(java.rmi.RemoteException e) { + // + // log.warn(e); + // + // } + // + // return proxy; + // + // } + // + // /** + // * Return the data service(s) matched on this registrar. + // * + // * @param registrar + // * The {@link ServiceRegistrar} to be queried. + // * + // * @return The data service or <code>null</code> if none was matched. + // * + // * @todo we need to describe the services to be discovered by their primary + // * interface and only search within a designated group that + // * corresponds to the bigdata federation of interest - that group is + // * part of the client configuration. + // * + // * @todo we need to filter out matches on {@link MetadataService} since it + // * extends {@link DataService}. + // * + // * @todo how do we ensure that we have seen all data services? If we query + // * each registrar as it is discovered and then register for updates + // * there are two ways in which we could miss some instances: (1) new + // * data services register between the query and the registration for + // * updates; and (2) the query will not return _ALL_ data services + // * registered, but only as match as the match limit. + // */ + // public ServiceMatches getDataServices(ServiceRegistrar registrar) { + // + // Class[] classes = new Class[] {IDataService.class}; + // + // ServiceTemplate template = new ServiceTemplate(null, classes, null); + // + // try { + // + // return registrar.lookup(template,0); + // + // } catch(java.rmi.RemoteException e) { + // + // log.warn(e); + // + // return null; + // + // } + // + // } + // /** ! * Extends the behavior to terminate {@link LookupCache} and ! * {@link ServiceDiscoveryManager} processing. */ public void destroy() { ! lookupCache.terminate(); ! serviceDiscoveryManager.terminate(); ! ! super.destroy(); } Index: MapReduceService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MapReduceService.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** MapReduceService.java 22 Mar 2007 15:04:15 -0000 1.2 --- MapReduceService.java 23 Apr 2007 13:09:29 -0000 1.3 *************** *** 255,259 **** IMetadataService mds = getMetadataService(); ! final int nentries = mds.rangeCount(name,fromKey,toKey); return null; --- 255,260 ---- IMetadataService mds = getMetadataService(); ! final int nentries = mds.rangeCount(IDataService.UNISOLATED, name, ! fromKey, toKey); return null; Index: IMetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IMetadataService.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IMetadataService.java 13 Apr 2007 15:04:19 -0000 1.4 --- IMetadataService.java 23 Apr 2007 13:09:29 -0000 1.5 *************** *** 63,87 **** * compatible with RMI. * - * @todo extend IDataService - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IMetadataService extends Remote { ! ! /** ! * The approximate number of entries in the index (non-transactional). ! */ ! public int getEntryCount(String name) throws IOException; ! ! /** ! * The approximate number of entries in the index for the specified key ! * range (non-transactional). ! * ! * @param fromKey ! * @param toKey ! * @return ! */ ! public int rangeCount(String name,byte[] fromKey,byte[] toKey) throws IOException; /** --- 63,70 ---- * compatible with RMI. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IMetadataService extends IDataService { /** |
From: Bryan T. <tho...@us...> - 2007-04-23 13:09:55
|
Update of /cvsroot/cweb/bigdata/src/resources/config/standalone In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv20436/src/resources/config/standalone Modified Files: ServerStarter.config Added Files: MetadataServer0.properties MetadataServer0.config Log Message: The metadata server now tracks join/leaves of data services within a managed set of registrars and has a test case for this feature. Index: ServerStarter.config =================================================================== RCS file: /cvsroot/cweb/bigdata/src/resources/config/standalone/ServerStarter.config,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ServerStarter.config 23 Mar 2007 20:01:25 -0000 1.1 --- ServerStarter.config 23 Apr 2007 13:09:29 -0000 1.2 *************** *** 74,77 **** --- 74,84 ---- static serviceDescriptors = new ServiceDescriptor[] { /* + * metadata server(s) + */ + new NonActivatableServiceDescriptor( + codebase, policy, classpath, + "com.bigdata.service.MetadataServer", + new String[] { "src/resources/config/standalone/MetadataServer0.config" }) + /* * data server(s) */ --- NEW FILE: MetadataServer0.config --- import java.io.File; import net.jini.jeri.BasicILFactory; import net.jini.jeri.BasicJeriExporter; import net.jini.jeri.tcp.TcpServerEndpoint; import net.jini.discovery.LookupDiscovery; import net.jini.core.discovery.LookupLocator; import net.jini.core.entry.Entry; import net.jini.lookup.entry.Name; import net.jini.lookup.entry.Comment; import net.jini.lookup.entry.Address; import net.jini.lookup.entry.Location; import net.jini.lookup.entry.ServiceInfo; /* * Declares how the service will provision itself. */ ServiceDescription { /* * This object is used to export the service proxy. The choice here effects * the protocol that will be used for communications between the clients and * the service. * * @todo Explore JERI nio option and customization support for serialization. */ exporter = new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory()); /* * The name of the property file containing the configuration information for * the service itself (where it will locate its files, etc). */ propertyFile = new File("src/resources/config/standalone/MetadataServer0.properties"); /* * The file on which the serviceID will be written. */ serviceIdFile = new File("MetadataServer0.id"); } /* * Declares how the service will advertise itself. */ AdvertDescription { /* * Entry attributes used to describe the service. */ entries = new Entry[] { new Name("MetadataService0"), // human facing name. new ServiceInfo("bigdata", // product or package name "SYSTAP,LLC", // manufacturer "SYSTAP,LLC", // vendor "0.1-beta", // version "MetadataService", // model "serial#" // serialNumber ) }; /* * Note: multicast discovery is always used if LookupDiscovery.ALL_GROUPS is * specified. */ // groups = LookupDiscovery.ALL_GROUPS; groups = new String[]{"bigdata"}; /* * One or more unicast URIs of the form jini://host/ or jini://host:port/. * This MAY be an empty array if you want to use multicast discovery _and_ * you have specified LookupDiscovery.ALL_GROUPS above. */ unicastLocators = new LookupLocator[] { // empty new LookupLocator("jini://localhost/") }; } --- NEW FILE: MetadataServer0.properties --- # DataServer configuration. file=MetadataServer0.jnl |
From: Bryan T. <tho...@us...> - 2007-04-23 13:09:42
|
Update of /cvsroot/cweb/bigdata In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv20436 Modified Files: project.xml Log Message: The metadata server now tracks join/leaves of data services within a managed set of registrars and has a test case for this feature. Index: project.xml =================================================================== RCS file: /cvsroot/cweb/bigdata/project.xml,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** project.xml 15 Apr 2007 23:29:26 -0000 1.6 --- project.xml 23 Apr 2007 13:09:30 -0000 1.7 *************** *** 416,419 **** --- 416,425 ---- <url>http://www.jini.org/wiki/Category:Jini_Starter_Kit</url> </dependency> + <dependency><!-- contains ServiceDiscoveryManager. --> + <groupId>jini</groupId> + <artifactId>jsk-lib</artifactId> + <version>2.1</version> + <url>http://www.jini.org/wiki/Category:Jini_Starter_Kit</url> + </dependency> <dependency> <groupId>jini</groupId> |
From: Bryan T. <tho...@us...> - 2007-04-23 13:09:37
|
Update of /cvsroot/cweb/bigdata/lib In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv20436/lib Added Files: jsk-lib-2.1.jar Log Message: The metadata server now tracks join/leaves of data services within a managed set of registrars and has a test case for this feature. --- NEW FILE: jsk-lib-2.1.jar --- (This appears to be a binary file; contents omitted.) |
From: Bryan T. <tho...@us...> - 2007-04-23 13:09:35
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv20436/src/test/com/bigdata/service Modified Files: TestDataServer0.java TestAll.java Added Files: AbstractServerTestCase.java TestMetadataServer0.java Log Message: The metadata server now tracks join/leaves of data services within a managed set of registrars and has a test case for this feature. --- NEW FILE: AbstractServerTestCase.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 22, 2007 */ package com.bigdata.service; import java.io.IOException; import java.net.InetAddress; import junit.framework.AssertionFailedError; import junit.framework.TestCase2; import net.jini.core.discovery.LookupLocator; import net.jini.core.lookup.ServiceID; import net.jini.core.lookup.ServiceRegistrar; import net.jini.core.lookup.ServiceTemplate; /** * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public abstract class AbstractServerTestCase extends TestCase2 { /** * */ public AbstractServerTestCase() { } /** * @param arg0 */ public AbstractServerTestCase(String arg0) { super(arg0); } /** * Return the {@link ServiceID} of a server that we started ourselves. The * method waits until the {@link ServiceID} becomes available on * {@link AbstractServer#getServiceID()}. * * @exception AssertionFailedError * If the {@link ServiceID} can not be found after a timeout. * * @exception InterruptedException * if the thread is interrupted while it is waiting to retry. */ protected ServiceID getServiceID(AbstractServer server) throws AssertionFailedError, InterruptedException { ServiceID serviceID = null; for(int i=0; i<10 && serviceID == null; i++) { /* * Note: This can be null since the serviceID is not assigned * synchonously by the registrar. */ serviceID = server.getServiceID(); if(serviceID == null) { /* * We wait a bit and retry until we have it or timeout. */ Thread.sleep(200); } } assertNotNull("serviceID",serviceID); return serviceID; } /** * Lookup a {@link DataService} by its {@link ServiceID} using unicast * discovery on localhost. * * @param serviceID * The {@link ServiceID}. * * @return The service. * * @todo Modify to return the service item? * * @todo Modify to not be specific to {@link DataService} vs * {@link MetadataService} (we need a common base interface for both * that carries most of the functionality but allows us to make * distinctions easily during discovery). */ public IDataService lookupDataService(ServiceID serviceID) throws IOException, ClassNotFoundException, InterruptedException { /* * Lookup the discover service (unicast on localhost). */ // get the hostname. InetAddress addr = InetAddress.getLocalHost(); String hostname = addr.getHostName(); // Find the service registrar (unicast protocol). final int timeout = 4*1000; // seconds. System.err.println("hostname: "+hostname); LookupLocator lookupLocator = new LookupLocator("jini://"+hostname); ServiceRegistrar serviceRegistrar = lookupLocator.getRegistrar( timeout ); /* * Prepare a template for lookup search. * * Note: The client needs a local copy of the interface in order to be * able to invoke methods on the service without using reflection. The * implementation class will be downloaded from the codebase identified * by the server. */ ServiceTemplate template = new ServiceTemplate(// /* * use this to request the service by its serviceID. */ serviceID, /* * Use this to filter services by an interface that they expose. */ // new Class[] { IDataService.class }, null, /* * use this to filter for services by Entry attributes. */ null); /* * Lookup a service. This can fail if the service registrar has not * finished processing the service registration. If it does, you can * generally just retry the test and it will succeed. However this * points out that the client may need to wait and retry a few times if * you are starting everthing up at once (or just register for * notification events for the service if it is not found and enter a * wait state). */ IDataService service = null; for (int i = 0; i < 10 && service == null; i++) { service = (IDataService) serviceRegistrar .lookup(template /* , maxMatches */); if (service == null) { System.err.println("Service not found: sleeping..."); Thread.sleep(200); } } if(service!=null) { System.err.println("Service found."); } return service; } } --- NEW FILE: TestMetadataServer0.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 22, 2007 */ package com.bigdata.service; import net.jini.core.lookup.ServiceID; /** * Test ability to launch, register, discover and use a {@link MetadataService} * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestMetadataServer0 extends AbstractServerTestCase { /** * */ public TestMetadataServer0() { } /** * @param arg0 */ public TestMetadataServer0(String arg0) { super(arg0); } MetadataServer metadataServer0; DataServer dataServer1; /** * Starts a {@link DataServer} ({@link #dataServer1}) and then a * {@link MetadataServer} ({@link #metadataServer0}). Each runs in its own * thread. */ public void setUp() throws Exception { /* * Start up a data server before the metadata server so that we can make * sure that it is detected by the metadata server once it starts up. */ dataServer1 = new DataServer( new String[] { "src/resources/config/standalone/DataServer1.config" }); new Thread() { public void run() { dataServer1.run(); } }.start(); /* * Start the metadata server. */ metadataServer0 = new MetadataServer( new String[] { "src/resources/config/standalone/MetadataServer0.config" }); new Thread() { public void run() { metadataServer0.run(); } }.start(); } /** * destroy the test services. */ public void tearDown() throws Exception { metadataServer0.destroy(); dataServer1.destroy(); } /** * Test the ability to discover the {@link MetadataService} and the ability * of the {@link MetadataServer} to track {@link DataService}s. * <p> * Note: We start a data service both before and after the metadata server * and verify that both wind up in the service cache and that the metadata * server itself does not wind up in the cache since it should be excluded * by the service item filter. */ public void test_serverRunning() throws Exception { ServiceID dataService1ID = getServiceID(dataServer1); ServiceID metadataServiceID = getServiceID(metadataServer0); final IMetadataService proxy = (IMetadataService) lookupDataService(metadataServiceID); assertNotNull("service not discovered", proxy); /* * Start a data service and verify that the metadata service will * discover it. */ final DataServer dataServer0 = new DataServer( new String[] { "src/resources/config/standalone/DataServer0.config" }); ServiceID dataService0ID = null; try { new Thread() { public void run() { dataServer0.run(); } }.start(); /* * wait until we get the serviceID as an indication that the data * service is running. */ dataService0ID = getServiceID(dataServer0); /* * verify that both data services were discovered by the metadata * server. */ System.err.println("Sleeping"); Thread.sleep(500); assertNotNull(metadataServer0.getDataServiceByID(dataService0ID)); assertNotNull(metadataServer0.getDataServiceByID(dataService1ID)); assertEquals("#dataServices", 2, metadataServer0 .getDataServiceCount()); } finally { /* * Destroy one of the data services and verify that the metadata * server notices this event. */ System.err.println("Destroying DataServer0"); dataServer0.destroy(); if (dataService0ID != null) { System.err.println("Sleeping"); Thread.sleep(500); assertEquals("#dataServices", 1, metadataServer0 .getDataServiceCount()); assertNull(metadataServer0.getDataServiceByID(dataService0ID)); assertNotNull(metadataServer0 .getDataServiceByID(dataService1ID)); } } } } Index: TestAll.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestAll.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestAll.java 20 Apr 2007 16:36:27 -0000 1.2 --- TestAll.java 23 Apr 2007 13:09:30 -0000 1.3 *************** *** 76,79 **** --- 76,85 ---- suite.addTestSuite( TestDataServer0.class ); + /* + * Test of a single client talking to a single metadata service + * instance. + */ + suite.addTestSuite( TestMetadataServer0.class ); + // suite.addTestSuite( TestServer.class ); // Does not implement TestCase. Index: TestDataServer0.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestDataServer0.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** TestDataServer0.java 21 Apr 2007 10:37:17 -0000 1.3 --- TestDataServer0.java 23 Apr 2007 13:09:30 -0000 1.4 *************** *** 48,66 **** package com.bigdata.service; - import java.io.IOException; - import java.net.InetAddress; import java.util.UUID; - import junit.framework.AssertionFailedError; - import junit.framework.TestCase2; - import net.jini.core.discovery.LookupLocator; import net.jini.core.lookup.ServiceID; - import net.jini.core.lookup.ServiceRegistrar; - import net.jini.core.lookup.ServiceTemplate; import com.bigdata.btree.IIndex; import com.bigdata.journal.IIndexStore; - /** * Test of client-server communications. The test starts a {@link DataServer} --- 48,58 ---- *************** *** 74,86 **** * and the service to agree on interface definitions, etc. You can use * <code>build.xml</code> in the root of this module to update that JAR. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo write another test class that accesses more than one - * {@link DataService} instance, e.g., by placing a different index on - * each {@link DataService}. */ ! public class TestDataServer0 extends TestCase2 { /** --- 66,79 ---- * and the service to agree on interface definitions, etc. You can use * <code>build.xml</code> in the root of this module to update that JAR. + * <p> + * Note: You MUST grant sufficient permissions for the tests to execute, e.g., + * <pre> + * -Djava.security.policy=policy.all + * </pre> * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class TestDataServer0 extends AbstractServerTestCase { /** *************** *** 97,104 **** } - String[] args = new String[]{ - "src/resources/config/standalone/DataServer0.config" - }; - DataServer dataServer0; --- 90,93 ---- *************** *** 106,110 **** public void setUp() throws Exception { ! dataServer0 = new DataServer(args); new Thread() { --- 95,101 ---- public void setUp() throws Exception { ! dataServer0 = new DataServer(new String[]{ ! "src/resources/config/standalone/DataServer0.config" ! }); new Thread() { *************** *** 130,174 **** /** - * Return the {@link ServiceID} of the specific service that we launched in - * #setUp(). - * - * @exception AssertionFailedError - * If the {@link ServiceID} can not be found after a timeout. - * - * @exception InterruptedException - * if the thread is interrupted while it is waiting to retry. - */ - protected ServiceID getServiceID() throws AssertionFailedError, InterruptedException { - - ServiceID serviceID = null; - - for(int i=0; i<10 && serviceID == null; i++) { - - /* - * Note: This can be null since the serviceID is not assigned - * synchonously by the registrar. - */ - - serviceID = dataServer0.getServiceID(); - - if(serviceID == null) { - - /* - * We wait a bit and retry until we have it or timeout. - */ - - Thread.sleep(200); - - } - - } - - assertNotNull("serviceID",serviceID); - - return serviceID; - - } - - /** * Exercises the basic features of the {@link IDataService} interface using * unisolated operations, including creating a B+Tree, insert, contains, --- 121,124 ---- *************** *** 177,186 **** * * @throws Exception - * - * @todo test {@link IDataService#submit(long, IProcedure)}. */ public void test_serverRunning() throws Exception { ! ServiceID serviceID = getServiceID(); final IDataService proxy = lookupDataService(serviceID); --- 127,134 ---- * * @throws Exception */ public void test_serverRunning() throws Exception { ! ServiceID serviceID = getServiceID(dataServer0); final IDataService proxy = lookupDataService(serviceID); *************** *** 308,393 **** } - /** - * Lookup a {@link DataService} by its {@link ServiceID}. - * - * @param serviceID - * The {@link ServiceID}. - */ - public IDataService lookupDataService(ServiceID serviceID) - throws IOException, ClassNotFoundException, InterruptedException { - - /* - * Lookup the discover service (unicast on localhost). - */ - - // get the hostname. - InetAddress addr = InetAddress.getLocalHost(); - String hostname = addr.getHostName(); - - // Find the service registrar (unicast protocol). - final int timeout = 4*1000; // seconds. - System.err.println("hostname: "+hostname); - LookupLocator lookupLocator = new LookupLocator("jini://"+hostname); - ServiceRegistrar serviceRegistrar = lookupLocator.getRegistrar( timeout ); - - /* - * Prepare a template for lookup search. - * - * Note: The client needs a local copy of the interface in order to be - * able to invoke methods on the service without using reflection. The - * implementation class will be downloaded from the codebase identified - * by the server. - */ - ServiceTemplate template = new ServiceTemplate(// - /* - * use this to request the service by its serviceID. - */ - serviceID, - /* - * Use this to filter services by an interface that they expose. - */ - // new Class[] { IDataService.class }, - null, - /* - * use this to filter for services by Entry attributes. - */ - null); - - /* - * Lookup a service. This can fail if the service registrar has not - * finished processing the service registration. If it does, you can - * generally just retry the test and it will succeed. However this - * points out that the client may need to wait and retry a few times if - * you are starting everthing up at once (or just register for - * notification events for the service if it is not found and enter a - * wait state). - */ - - IDataService service = null; - - for (int i = 0; i < 10 && service == null; i++) { - - service = (IDataService) serviceRegistrar - .lookup(template /* , maxMatches */); - - if (service == null) { - - System.err.println("Service not found: sleeping..."); - - Thread.sleep(200); - - } - - } - - if(service!=null) { - - System.err.println("Service found."); - - } - - return service; - - } - } --- 256,258 ---- |
From: Bryan T. <tho...@us...> - 2007-04-21 10:37:23
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23696/src/java/com/bigdata/service Modified Files: DataServer.java IWritePipeline.java IDataService.java DataService.java DataServiceClient.java EmbeddedDataService.java IReadOnlyProcedure.java IProcedure.java Log Message: Wrote test case for IDataService#submit(...) and modified that method to return an optional result from the procedure. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** DataServer.java 20 Apr 2007 16:36:27 -0000 1.5 --- DataServer.java 21 Apr 2007 10:37:16 -0000 1.6 *************** *** 59,62 **** --- 59,67 ---- /** * The bigdata data server. + * <p> + * The {@link DataServer} starts the {@link DataService}. The server and + * service are configured using a {@link Configuration} file whose name is + * passed to the {@link DataServer#DataServer(String[])} constructor or + * {@link #main(String[])}. * * @see src/resources/config for sample configurations. *************** *** 86,91 **** /** ! * Starts a new {@link DataServer}. ! * * @param args * The name of the {@link Configuration} file for the service. --- 91,101 ---- /** ! * Starts a new {@link DataServer}. This can be done programmatically ! * by executing ! * <pre> ! * new DataServer(args).run(); ! * </pre> ! * within a {@link Thread}. ! * * @param args * The name of the {@link Configuration} file for the service. Index: IWritePipeline.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IWritePipeline.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IWritePipeline.java 18 Mar 2007 22:29:43 -0000 1.1 --- IWritePipeline.java 21 Apr 2007 10:37:16 -0000 1.2 *************** *** 48,51 **** --- 48,53 ---- package com.bigdata.service; + import com.bigdata.rawstore.IRawStore; + /** * An interface used to pipeline writes against index partitions over one or *************** *** 60,63 **** --- 62,112 ---- * on which they are writing and the write requests should be chained down * the configured write pipeline. + * + * @todo verify that conditional insert logic can not cause inconsistent data to + * appear depending on the order in which writes are received by the data + * services in a pipeline. For the pipeline to be consistent the order in + * which client operations execute MUST NOT differ on different data + * services in the pipeline for the same index partition. Consider that + * two clients are loading RDF documents whose terms overlap. If the order + * of the client operations differs on the different services for the + * pipeline, then different term identifiers could be assigned to the same + * term by different data services. + * + * @todo commit (and group commit) semantics must be respected by the pipeline. + * this is basically a (potentially special) case of a 2-phase commit. If + * the, e.g., the last data services in the pipeline suddenly runs out of + * disk or otherwise "hiccups" then then it will not be consistent with + * the other replicas of the index partition in that pipeline. I need to + * think through how to handle this further. For example, the commit could + * propagate along the pipeline, but that does not work if group commit is + * triggered by different events (latency and data volumn) on different + * data services. + * <p> + * A distributed file system solves this problem by having only a single + * data service that is the chokepoint for concurrency control (and hence + * for consistency control) for any given index partition. Essentially, + * the distributed file system provides media redundency - the same bytes + * in the same order appear in each copy of the file backing the journal. + * <p> + * So, it seems that a way to achieve that without a distributed file + * system is to have the write pipeline operate and the {@link IRawStore} + * API. It simply streams writes down to the next service in the pipeline + * and that is the sole way in which downstream services have their stores + * written. Since low-level writes can be 1 GB/sec on a transient buffer, + * this protocol could be separated from the data service and become a + * media redundency protocol only. Downstream writes would not even need + * to sync to disk on "sync" but only on buffer overflow since the data + * service at the head of the pipeline is already providing restart-safe + * state and they are providing redundency for the first point of failure. + * If the data service does fail, then the first media redundency service + * would sync its state to disk and take over as the data service. + * <p> + * Work through how the service accepts responsibility for media + * redundency for files, how it names its local files (source host / + * filename?), how replicated files are managed (close, closeAndDelete, + * bulk read, syncToDisk, etc.) + * <p> + * Work through how index partitions can be shed or picked up by data + * services in this media redundency model. */ public interface IWritePipeline { Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** IDataService.java 20 Apr 2007 17:24:23 -0000 1.9 --- IDataService.java 21 Apr 2007 10:37:16 -0000 1.10 *************** *** 45,48 **** --- 45,49 ---- import java.io.IOException; + import java.io.Serializable; import java.util.UUID; import java.util.concurrent.ExecutionException; *************** *** 299,359 **** throws InterruptedException, ExecutionException, IOException, IOException; ! ! // /** ! // * Typesafe enum for flags that control the behavior of ! // * {@link IDataService#rangeQuery(long, String, byte[], byte[], int, int)} ! // * ! // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! // * @version $Id$ ! // */ ! // public static enum RangeQueryEnum { ! // ! // /** ! // * Flag specifies that keys in the key range will be returned. When not ! // * given, the keys will NOT be included in the {@link ResultSetChunk}s ! // * sent to the client. ! // */ ! // Keys(1 << 1), ! // ! // /** ! // * Flag specifies that values in the key range will be returned. When ! // * not given, the values will NOT be included in the ! // * {@link ResultSetChunk}s sent to the client. ! // */ ! // Values(1 << 2); ! // ! // private final int flag; ! // ! // private RangeQueryEnum(int flag) { ! // ! // this.flag = flag; ! // ! // } ! // ! // /** ! // * True iff this flag is set. ! // * ! // * @param flags ! // * An integer on which zero or more flags have been set. ! // * ! // * @return True iff this flag is set. ! // */ ! // public boolean isSet(int flags) { ! // ! // return (flags & flag) == 1; ! // ! // } ! // ! // /** ! // * The bit mask for this flag. ! // */ ! // public final int valueOf() { ! // ! // return flag; ! // ! // } ! // ! // }; ! /** * <p> --- 300,304 ---- throws InterruptedException, ExecutionException, IOException, IOException; ! /** * <p> *************** *** 385,401 **** * The procedure to be executed. * * @throws IOException * @throws InterruptedException * @throws ExecutionException */ ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException; - // /** - // * Execute a map worker task against all key/value pairs in a key range, - // * writing the results onto N partitions of an intermediate file. - // */ - // public void map(long tx, String name, byte[] fromKey, byte[] toKey, - // IMapOp op) throws InterruptedException, ExecutionException; - } --- 330,344 ---- * The procedure to be executed. * + * @return The result, which is entirely defined by the procedure + * implementation and which MAY be null. In general, this MUST be + * {@link Serializable} since it may have to pass across a network + * interface. + * * @throws IOException * @throws InterruptedException * @throws ExecutionException */ ! public Object submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException; } Index: DataServiceClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServiceClient.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** DataServiceClient.java 20 Apr 2007 17:24:24 -0000 1.7 --- DataServiceClient.java 21 Apr 2007 10:37:16 -0000 1.8 *************** *** 112,118 **** // } ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { ! delegate.submit(tx, proc); ! } public void abort(long tx) throws IOException { --- 112,118 ---- // } ! // public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { ! // delegate.submit(tx, proc); ! // } public void abort(long tx) throws IOException { Index: IProcedure.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IProcedure.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IProcedure.java 15 Mar 2007 16:11:11 -0000 1.1 --- IProcedure.java 21 Apr 2007 10:37:16 -0000 1.2 *************** *** 1,45 **** /** ! The Notice below must appear in each file of the Source Code of any ! copy you distribute of the Licensed Product. Contributors to any ! Modifications may add their own copyright notices to identify their ! own contributions. ! License: ! The contents of this file are subject to the CognitiveWeb Open Source ! License Version 1.1 (the License). You may not copy or use this file, ! in either source code or executable form, except in compliance with ! the License. You may obtain a copy of the License from ! http://www.CognitiveWeb.org/legal/license/ ! Software distributed under the License is distributed on an AS IS ! basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See ! the License for the specific language governing rights and limitations ! under the License. ! Copyrights: ! Portions created by or assigned to CognitiveWeb are Copyright ! (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact ! information for CognitiveWeb is available at ! http://www.CognitiveWeb.org ! Portions Copyright (c) 2002-2003 Bryan Thompson. ! Acknowledgements: ! Special thanks to the developers of the Jabber Open Source License 1.0 ! (JOSL), from which this License was derived. This License contains ! terms that differ from JOSL. ! Special thanks to the CognitiveWeb Open Source Contributors for their ! suggestions and support of the Cognitive Web. ! Modifications: ! */ /* * Created on Mar 15, 2007 --- 1,45 ---- /** ! The Notice below must appear in each file of the Source Code of any ! copy you distribute of the Licensed Product. Contributors to any ! Modifications may add their own copyright notices to identify their ! own contributions. ! License: ! The contents of this file are subject to the CognitiveWeb Open Source ! License Version 1.1 (the License). You may not copy or use this file, ! in either source code or executable form, except in compliance with ! the License. You may obtain a copy of the License from ! http://www.CognitiveWeb.org/legal/license/ ! Software distributed under the License is distributed on an AS IS ! basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See ! the License for the specific language governing rights and limitations ! under the License. ! Copyrights: ! Portions created by or assigned to CognitiveWeb are Copyright ! (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact ! information for CognitiveWeb is available at ! http://www.CognitiveWeb.org ! Portions Copyright (c) 2002-2003 Bryan Thompson. ! Acknowledgements: ! Special thanks to the developers of the Jabber Open Source License 1.0 ! (JOSL), from which this License was derived. This License contains ! terms that differ from JOSL. ! Special thanks to the CognitiveWeb Open Source Contributors for their ! suggestions and support of the Cognitive Web. ! Modifications: ! */ /* * Created on Mar 15, 2007 *************** *** 48,51 **** --- 48,53 ---- package com.bigdata.service; + import java.io.Serializable; + import com.bigdata.journal.IIndexStore; import com.bigdata.journal.IJournal; *************** *** 54,62 **** /** * A procedure to be executed on an {@link IDataService}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IProcedure { /** --- 56,72 ---- /** * A procedure to be executed on an {@link IDataService}. + * <p> + * Note: while this interface is {@link Serializable}, that provides only for + * communicating state to the {@link IDataService}. If an instance of this + * procedure will cross a network interface, then the implementation Class MUST + * be available to the {@link IDataService} on which it will execute. This can + * be as simple as bundling the procedure into a JAR that is part of the + * CLASSPATH used to start a {@link DataService} or you can use downloaded code + * with the JINI codebase mechanism. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IProcedure extends Serializable { /** *************** *** 76,81 **** * {@link ITx}. If the procedure is running unisolated, then * this will be an {@link IJournal}. */ ! public void apply(long tx,IIndexStore store); ! } --- 86,96 ---- * {@link ITx}. If the procedure is running unisolated, then * this will be an {@link IJournal}. + * + * @return The result, which is entirely defined by the procedure + * implementation and which MAY be null. In general, this MUST be + * {@link Serializable} since it may have to pass across a network + * interface. */ ! public Object apply(long tx, IIndexStore store); ! } Index: EmbeddedDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/EmbeddedDataService.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** EmbeddedDataService.java 20 Apr 2007 17:24:24 -0000 1.6 --- EmbeddedDataService.java 21 Apr 2007 10:37:16 -0000 1.7 *************** *** 121,127 **** // } ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { ! delegate.submit(tx, proc); ! } public void abort(long tx) throws IOException { --- 121,127 ---- // } ! // public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { ! // delegate.submit(tx, proc); ! // } public void abort(long tx) throws IOException { Index: IReadOnlyProcedure.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IReadOnlyProcedure.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IReadOnlyProcedure.java 15 Mar 2007 16:11:10 -0000 1.1 --- IReadOnlyProcedure.java 21 Apr 2007 10:37:16 -0000 1.2 *************** *** 48,52 **** package com.bigdata.service; ! public interface IReadOnlyProcedure { } \ No newline at end of file --- 48,59 ---- package com.bigdata.service; ! /** ! * Procedures that implement this marker interface will be executed within a ! * read-only context and MUST NOT attempt to write on an index. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! public interface IReadOnlyProcedure extends IProcedure { } \ No newline at end of file Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** DataService.java 20 Apr 2007 17:24:23 -0000 1.9 --- DataService.java 21 Apr 2007 10:37:16 -0000 1.10 *************** *** 49,53 **** import java.io.IOException; - import java.io.Serializable; import java.net.InetSocketAddress; import java.rmi.Remote; --- 49,52 ---- *************** *** 71,74 **** --- 70,75 ---- import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.AbstractJournal; + import com.bigdata.journal.IAtomicStore; + import com.bigdata.journal.ICommitter; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; *************** *** 76,81 **** /** ! * An implementation of a data service suitable for use with RPC, direct client ! * calls (if decoupled by an operation queue), or a NIO interface. * <p> * This implementation is thread-safe. It will block for each operation. It MUST --- 77,82 ---- /** ! * An implementation of a network-capable {@link IDataService}. The service is ! * started using the {@link DataServer} class. * <p> * This implementation is thread-safe. It will block for each operation. It MUST *************** *** 101,113 **** * an NIO interface to the data service. * ! * @todo add assertOpen() throughout ! * ! * @todo declare interface for managing service shutdown()/shutdownNow()? * * @todo support group commit for unisolated writes. i may have to refactor some * to get group commit to work for both transaction commits and unisolated * writes. basically, the tasks on the ! * {@link AbstractJournal#writeService write service} need to get ! * aggregated. * * @todo implement NIODataService, RPCDataService(possible), EmbeddedDataService --- 102,135 ---- * an NIO interface to the data service. * ! * @todo Note that "auto-commit" is provided for unisolated writes. This relies ! * on two things. First, only the {@link UnisolatedBTree} recoverable from ! * {@link AbstractJournal#getIndex(String)} is mutable - all other ways to ! * recover the named index will return a read-only view of a historical ! * committed state. Second, an explicit {@link IAtomicStore#commit()} must ! * be performed to make the changes restart-safe. The commit can only be ! * performed when no unisolated write is executing (even presuming that ! * different mutable btrees can receive writes concurrently) since it will ! * cause all dirty {@link ICommitter} to become restart-safe. Group commit ! * is essential to high throughput when unisolated writes are relatively ! * small. * * @todo support group commit for unisolated writes. i may have to refactor some * to get group commit to work for both transaction commits and unisolated * writes. basically, the tasks on the ! * {@link AbstractJournal#writeService} need to get aggregated (or each ! * commit examines the length of the write queue (basically, is there ! * another write in the queue or is this the last one), latency and data ! * volumn since the last commit and makes a decision whether or not to ! * commit at that time; if the commit is deferred, then it is placed onto ! * a queue of operations that have not finished and for which we can not ! * yet report "success" - even though additional unisolated writes must ! * continue to run; we also need to make sure that a commit will occur at ! * the first opportunity following the minimum latency -- even if no ! * unisolated writes are scheduled (or a single client would hang waiting ! * for a commit). ! * ! * @todo add assertOpen() throughout ! * ! * @todo declare interface for managing service shutdown()/shutdownNow()? * * @todo implement NIODataService, RPCDataService(possible), EmbeddedDataService *************** *** 155,159 **** * bi-directional transfer? * ! * @todo We will use non-blocking I/O for the page transfer protocol. Review * options to secure that protocol since we can not use JERI for that * purpose. For example, using SSL or an SSH tunnel. For most purposes I --- 177,182 ---- * bi-directional transfer? * ! * @todo We will use non-blocking I/O for the data transfer protocol in order to ! * support an efficient pipelining of writes across data services. Review * options to secure that protocol since we can not use JERI for that * purpose. For example, using SSL or an SSH tunnel. For most purposes I *************** *** 458,462 **** } ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { --- 481,485 ---- } ! public Object submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { *************** *** 469,477 **** if(isolated) { ! txService.submit(new TxProcedureTask(tx,proc)).get(); } else if( readOnly ) { ! readService.submit(new UnisolatedReadProcedureTask(proc)).get(); } else { --- 492,519 ---- if(isolated) { ! return txService.submit(new TxProcedureTask(tx,proc)).get(); } else if( readOnly ) { ! /* ! * FIXME The IReadOnlyInterface is a promise that the procedure will ! * not write on an index (or anything on the store), but it is NOT a ! * guarentee. Consider removing that interface and the option to run ! * unisolated as anything but "read/write" since a "read-only" that ! * in fact attempted to write could cause problems with the index ! * data structure. Alternatively, examine other means for running a ! * "read-only" unisolated store that enforces read-only semantics. ! * ! * For example, since the IIndexStore defines only a single method, ! * getIndex(String), we could provide an implementation of that ! * method that always selected a historical committed state for the ! * index. This would make writes impossible since they would be ! * rejected by the index object itself. ! * ! * Fix this and write tests that demonstrate that writes are ! * rejected if the proc implements IReadOnlyProcedure. ! */ ! ! return readService.submit(new UnisolatedReadProcedureTask(proc)).get(); } else { *************** *** 481,485 **** * complete writes MUST be committed. */ ! journal.serialize(new UnisolatedReadWriteProcedureTask(proc)).get(); } --- 523,527 ---- * complete writes MUST be committed. */ ! return journal.serialize(new UnisolatedReadWriteProcedureTask(proc)).get(); } *************** *** 511,523 **** /** * ! * FIXME the iterator needs to be aware of the defintion of a "row" for the ! * sparse row store so that we can respect the atomic guarentee for reads as ! * well as writes. * ! * FIXME support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that encode a ! * column name and datum or write time into the key to those that will ! * filter based on inspection of the value associated with the key, e.g., ! * only values having some attribute. * * @todo if we allow the filter to cause mutations (e.g., deleting matching --- 553,565 ---- /** * ! * @todo the iterator needs to be aware of the defintion of a "row" for the ! * sparse row store so that we can respect the atomic guarentee for ! * reads as well as writes. * ! * @todo support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that ! * encode a column name and datum or write time into the key to those ! * that will filter based on inspection of the value associated with ! * the key, e.g., only values having some attribute. * * @todo if we allow the filter to cause mutations (e.g., deleting matching *************** *** 1099,1105 **** public Object call() throws Exception { ! proc.apply(tx.getStartTimestamp(),tx); ! ! return null; } --- 1141,1145 ---- public Object call() throws Exception { ! return proc.apply(tx.getStartTimestamp(),tx); } *************** *** 1123,1129 **** public Object call() throws Exception { ! proc.apply(0L,journal); ! ! return null; } --- 1163,1167 ---- public Object call() throws Exception { ! return proc.apply(0L,journal); } *************** *** 1155,1166 **** } ! public Long call() throws Exception { try { ! super.call(); // commit (synchronous, immediate). ! return journal.commit(); } catch(Throwable t) { --- 1193,1206 ---- } ! public Object call() throws Exception { try { ! Object result = super.call(); // commit (synchronous, immediate). ! journal.commit(); ! ! return result; } catch(Throwable t) { |
From: Bryan T. <tho...@us...> - 2007-04-21 10:37:21
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23696/src/test/com/bigdata/service Modified Files: TestDataServer0.java Log Message: Wrote test case for IDataService#submit(...) and modified that method to return an optional result from the procedure. Index: TestDataServer0.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestDataServer0.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestDataServer0.java 20 Apr 2007 17:24:24 -0000 1.2 --- TestDataServer0.java 21 Apr 2007 10:37:17 -0000 1.3 *************** *** 59,62 **** --- 59,65 ---- import net.jini.core.lookup.ServiceTemplate; + import com.bigdata.btree.IIndex; + import com.bigdata.journal.IIndexStore; + /** *************** *** 181,185 **** ServiceID serviceID = getServiceID(); ! IDataService proxy = lookupDataService(serviceID); assertNotNull("service not discovered",proxy); --- 184,188 ---- ServiceID serviceID = getServiceID(); ! final IDataService proxy = lookupDataService(serviceID); assertNotNull("service not discovered",proxy); *************** *** 256,261 **** assertEquals(null,values[1]); ! proxy.dropIndex(name); } --- 259,309 ---- assertEquals(null,values[1]); ! /* ! * run a server local procedure. ! */ ! { ! ! IProcedure proc = new RangeCountProcedure(name); ! ! /* ! * Note: The result is ONE (1) since there is one deleted entry in ! * the UnisolatedBTree and rangeCount does not correct for deletion ! * markers! ! */ ! assertEquals("result", 1, proxy.submit(IDataService.UNISOLATED, ! proc)); ! ! } + proxy.dropIndex(name); + + } + + /** + * This procedure just computes a range count on the index. + */ + private static class RangeCountProcedure implements IProcedure { + + private static final long serialVersionUID = 5856712176446915328L; + + private final String name; + + public RangeCountProcedure(String name) { + + if (name == null) + throw new IllegalArgumentException(); + + this.name = name; + + } + + public Object apply(long tx, IIndexStore store) { + + IIndex ndx = store.getIndex(name); + + return new Integer(ndx.rangeCount(null, null)); + + } + } |
From: Bryan T. <tho...@us...> - 2007-04-20 17:24:28
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv1508/src/java/com/bigdata/service Modified Files: IDataService.java DataService.java DataServiceClient.java EmbeddedDataService.java Added Files: ResultSet.java Log Message: Implemented Externalizable for ResultSet and moved it to a top-level class. Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** IDataService.java 20 Apr 2007 16:36:27 -0000 1.8 --- IDataService.java 20 Apr 2007 17:24:23 -0000 1.9 *************** *** 52,56 **** import com.bigdata.journal.ITransactionManager; import com.bigdata.journal.ITxCommitProtocol; - import com.bigdata.service.DataService.ResultSet; /** --- 52,55 ---- Index: EmbeddedDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/EmbeddedDataService.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** EmbeddedDataService.java 20 Apr 2007 16:36:27 -0000 1.5 --- EmbeddedDataService.java 20 Apr 2007 17:24:24 -0000 1.6 *************** *** 56,60 **** import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; - import com.bigdata.service.DataService.ResultSet; import com.bigdata.util.concurrent.DaemonThreadFactory; --- 56,59 ---- Index: DataServiceClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServiceClient.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** DataServiceClient.java 20 Apr 2007 16:36:27 -0000 1.6 --- DataServiceClient.java 20 Apr 2007 17:24:24 -0000 1.7 *************** *** 54,58 **** import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; - import com.bigdata.service.DataService.ResultSet; /** --- 54,57 ---- Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** DataService.java 20 Apr 2007 16:36:27 -0000 1.8 --- DataService.java 20 Apr 2007 17:24:23 -0000 1.9 *************** *** 48,54 **** package com.bigdata.service; - import java.io.Externalizable; import java.io.IOException; - import java.io.ObjectOutputStream; import java.io.Serializable; import java.net.InetSocketAddress; --- 48,52 ---- *************** *** 65,72 **** import com.bigdata.btree.BatchLookup; import com.bigdata.btree.BatchRemove; - import com.bigdata.btree.BytesUtil; import com.bigdata.btree.IBatchBTree; import com.bigdata.btree.IBatchOp; - import com.bigdata.btree.IEntryIterator; import com.bigdata.btree.IIndex; import com.bigdata.btree.ILinearList; --- 63,68 ---- *************** *** 1033,1157 **** /** - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - * - * @todo implement {@link Externalizable}, probably buffer the - * {@link ObjectOutputStream} and focus on byte[] transfers. - */ - public static class ResultSet implements Serializable { - - /** - * Total #of key-value pairs within the key range (approximate). - */ - public final int rangeCount; - - /** - * Actual #of key-value pairs in the {@link ResultSet} - */ - public final int ntuples; - - /** - * True iff the iterator exhausted the available keys such that no more - * results would be available if you formed the successor of the - * {@link #lastKey}. - */ - final public boolean exhausted; - - /** - * The last key visited by the iterator <em>regardless</em> of the - * filter imposed -or- <code>null</code> iff no keys were visited by - * the iterator for the specified key range. - * - * @see #nextKey() - */ - public final byte[] lastKey; - - /** - * The next key that should be used to retrieve keys and/or values - * starting from the first possible successor of the {@link #lastKey} - * visited by the iterator in this operation (the successor is formed by - * appending a <code>nul</code> byte to the {@link #lastKey}). - * - * @return The successor of {@link #lastKey} -or- <code>null</code> - * iff the iterator exhausted the available keys. - * - * @exception UnsupportedOperationException - * if the {@link #lastKey} is <code>null</code>. - */ - public byte[] nextKey() { - - if (lastKey == null) - throw new UnsupportedOperationException(); - - return BytesUtil.successor(lastKey); - - } - - /** - * The visited keys iff the {@link RangeQueryEnum#Keys} flag was set. - */ - public final byte[][] keys; - - /** - * The visited values iff the {@link RangeQueryEnum#Values} flag was - * set. - */ - public final byte[][] vals; - - public ResultSet(final IIndex ndx, final byte[] fromKey, - final byte[] toKey, final int capacity, final boolean sendKeys, - final boolean sendVals) { - - // The upper bound on the #of key-value pairs in the range. - rangeCount = ndx.rangeCount(fromKey, toKey); - - final int limit = (rangeCount > capacity ? capacity : rangeCount); - - int ntuples = 0; - - keys = (sendKeys ? new byte[limit][] : null); - - vals = (sendVals ? new byte[limit][] : null); - - // iterator that will visit the key range. - IEntryIterator itr = ndx.rangeIterator(fromKey, toKey); - - /* - * true if any keys were visited regardless of whether or not they - * satisified the optional filter. This is used to make sure that we - * always return the lastKey visited if any keys were visited and - * otherwise set lastKey := null. - */ - boolean anything = false; - - while (ntuples < limit && itr.hasNext()) { - - anything = true; - - byte[] val = (byte[]) itr.next(); - - if (sendVals) - vals[ntuples] = val; - - if (sendKeys) - keys[ntuples] = itr.getKey(); - - // #of results that will be returned. - ntuples++; - - } - - this.ntuples = ntuples; - - this.lastKey = (anything ? itr.getKey() : null); - - this.exhausted = ! itr.hasNext(); - - } - - } - - /** * Abstract class for tasks that execute {@link IProcedure} operations. * There are various concrete subclasses, each of which MUST be submitted to --- 1029,1032 ---- --- NEW FILE: ResultSet.java --- package com.bigdata.service; import java.io.DataOutput; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import org.CognitiveWeb.extser.LongPacker; import org.CognitiveWeb.extser.ShortPacker; import com.bigdata.btree.BytesUtil; import com.bigdata.btree.IEntryIterator; import com.bigdata.btree.IIndex; /** * An object used to stream key scan results back to the client. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class ResultSet implements Externalizable { /** * */ private static final long serialVersionUID = 8823205844134046434L; private int rangeCount; private int ntuples; private boolean exhausted; private byte[] lastKey; private byte[][] keys; private byte[][] vals; /** * Total #of key-value pairs within the key range (approximate). */ public int getRangeCount() {return rangeCount;} /** * Actual #of key-value pairs in the {@link ResultSet} */ public int getNumTuples() {return ntuples;} /** * True iff the iterator exhausted the available keys such that no more * results would be available if you formed the successor of the * {@link #lastKey}. */ public boolean isExhausted() {return exhausted;} /** * The last key visited by the iterator <em>regardless</em> of the * filter imposed -or- <code>null</code> iff no keys were visited by * the iterator for the specified key range. * * @see #successor() */ public byte[] getLastKey() {return lastKey;} /** * The next key that should be used to retrieve keys and/or values starting * from the first possible successor of the {@link #getLastKey()} visited by * the iterator in this operation (the successor is formed by appending a * <code>nul</code> byte to the {@link #getLastKey()}). * * @return The successor of {@link #getLastKey()} -or- <code>null</code> * iff the iterator exhausted the available keys. * * @exception UnsupportedOperationException * if the {@link #lastKey} is <code>null</code>. */ public byte[] successor() { if (lastKey == null) throw new UnsupportedOperationException(); return BytesUtil.successor(lastKey); } /** * The visited keys iff the keys were requested. */ public byte[][] getKeys() {return keys;} /** * The visited values iff the values were requested. */ public byte[][] getValues() {return vals;} /** * Deserialization constructor. */ public ResultSet() {} /** * Constructor used by the {@link DataService} to populate the * {@link ResultSet}. * * @param ndx * @param fromKey * @param toKey * @param capacity * @param sendKeys * @param sendVals */ public ResultSet(final IIndex ndx, final byte[] fromKey, final byte[] toKey, final int capacity, final boolean sendKeys, final boolean sendVals) { // The upper bound on the #of key-value pairs in the range. rangeCount = ndx.rangeCount(fromKey, toKey); final int limit = (rangeCount > capacity ? capacity : rangeCount); int ntuples = 0; keys = (sendKeys ? new byte[limit][] : null); vals = (sendVals ? new byte[limit][] : null); // iterator that will visit the key range. IEntryIterator itr = ndx.rangeIterator(fromKey, toKey); /* * true if any keys were visited regardless of whether or not they * satisified the optional filter. This is used to make sure that we * always return the lastKey visited if any keys were visited and * otherwise set lastKey := null. */ boolean anything = false; while (ntuples < limit && itr.hasNext()) { anything = true; byte[] val = (byte[]) itr.next(); if (sendVals) vals[ntuples] = val; if (sendKeys) keys[ntuples] = itr.getKey(); // #of results that will be returned. ntuples++; } this.ntuples = ntuples; this.lastKey = (anything ? itr.getKey() : null); this.exhausted = ! itr.hasNext(); } protected static short VERSION0 = 0x0; public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { final short version = ShortPacker.unpackShort(in); if (version != VERSION0) throw new IOException("Unknown version=" + version); rangeCount = (int) LongPacker.unpackLong(in); ntuples = (int)LongPacker.unpackLong(in); exhausted = in.readBoolean(); final boolean haveKeys = in.readBoolean(); final boolean haveVals = in.readBoolean(); final int lastKeySize = (int)LongPacker.unpackLong(in); if(lastKeySize!=0) { lastKey = new byte[lastKeySize]; in.readFully(lastKey); } else { lastKey = null; } if(haveKeys) { keys = new byte[ntuples][]; for(int i=0;i<ntuples;i++) { int size = (int)LongPacker.unpackLong(in); byte[] tmp = new byte[size]; in.readFully(tmp); keys[i] = tmp; } } else { keys = null; } if(haveVals) { vals = new byte[ntuples][]; for(int i=0;i<ntuples;i++) { int size = (int)LongPacker.unpackLong(in); byte[] tmp = new byte[size]; in.readFully(tmp); vals[i] = tmp; } } else { vals = null; } } public void writeExternal(ObjectOutput out) throws IOException { /* * @todo once I have some benchmarks for the data service protocol, * try the changes commented out below and see if the performance is * better when I explicitly buffer the writes. */ // ByteArrayOutputStream baos = new ByteArrayOutputStream(100 + ntuples // * 512); // DataOutput dos = new DataOutputStream(baos); DataOutput dos = out; ShortPacker.packShort(dos, VERSION0); LongPacker.packLong(dos, rangeCount); LongPacker.packLong(dos,ntuples); dos.writeBoolean(exhausted); dos.writeBoolean(keys!=null); dos.writeBoolean(vals!=null); LongPacker.packLong(dos,lastKey==null?0:lastKey.length); if(lastKey!=null) { dos.write(lastKey); } if(keys!=null) { for(int i=0; i<ntuples; i++) { LongPacker.packLong(dos, keys[i].length); dos.write(keys[i]); } } if(vals!=null) { for(int i=0; i<ntuples; i++) { LongPacker.packLong(dos, vals[i].length); dos.write(vals[i]); } } // out.write(baos.toByteArray()); } } |
From: Bryan T. <tho...@us...> - 2007-04-20 17:24:28
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv1508/src/test/com/bigdata/service Modified Files: TestDataServer0.java Log Message: Implemented Externalizable for ResultSet and moved it to a top-level class. Index: TestDataServer0.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestDataServer0.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestDataServer0.java 20 Apr 2007 16:36:27 -0000 1.1 --- TestDataServer0.java 20 Apr 2007 17:24:24 -0000 1.2 *************** *** 59,63 **** import net.jini.core.lookup.ServiceTemplate; - import com.bigdata.service.DataService.ResultSet; /** --- 59,62 ---- *************** *** 175,178 **** --- 174,179 ---- * * @throws Exception + * + * @todo test {@link IDataService#submit(long, IProcedure)}. */ public void test_serverRunning() throws Exception { *************** *** 226,237 **** null, 100, flags ); ! assertEquals("rangeCount",1,rset.rangeCount); ! assertEquals("ntuples",1,rset.ntuples); ! assertTrue("exhausted",rset.exhausted); ! assertEquals("lastKey",new byte[]{1},rset.lastKey); ! assertEquals("keys.length",1,rset.keys.length); ! assertEquals(new byte[]{1},rset.keys[0]); ! assertEquals("vals.length",1,rset.vals.length); ! assertEquals(new byte[]{1},rset.vals[0]); // remove key that exists, verifying the returned values. --- 227,240 ---- null, 100, flags ); ! assertEquals("rangeCount",1,rset.getRangeCount()); ! assertEquals("ntuples",1,rset.getNumTuples()); ! assertTrue("exhausted",rset.isExhausted()); ! assertEquals("lastKey",new byte[]{1},rset.getLastKey()); ! assertNotNull("keys",rset.getKeys()); ! assertEquals("keys.length",1,rset.getKeys().length); ! assertEquals(new byte[]{1},rset.getKeys()[0]); ! assertNotNull("vals",rset.getValues()); ! assertEquals("vals.length",1,rset.getValues().length); ! assertEquals(new byte[]{1},rset.getValues()[0]); // remove key that exists, verifying the returned values. |
From: Bryan T. <tho...@us...> - 2007-04-20 16:36:48
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16041/src/java/com/bigdata/rdf/inf Modified Files: InferenceEngine.java RuleRdfs09.java AbstractRuleRdfs_2_3_7_9.java Log Message: Updated the IDataService interface and now have client talking over JERI to a data service instance for various btree operations, range count, and range query. Index: RuleRdfs09.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/RuleRdfs09.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** RuleRdfs09.java 19 Apr 2007 19:13:45 -0000 1.6 --- RuleRdfs09.java 20 Apr 2007 16:36:36 -0000 1.7 *************** *** 67,73 **** /** ! * Overriden to be two bound (more selective), but otherwise also returning ! * data in POS order. The query is formed from triple(?v,rdf:type,stmt1.s) ! * and expressed in POS order as { rdf:type, stmt1.s, ?v }. */ protected SPO[] getStmts2( SPO stmt1 ) { --- 67,74 ---- /** ! * Overriden to be two bound (more selective, but also joining stmt1.s to ! * stmt2.o rather than to stmt2.p) and also returning data in POS order. The ! * query is formed from triple(?v,rdf:type,stmt1.s) and expressed in POS ! * order as { rdf:type, stmt1.s, ?v }. */ protected SPO[] getStmts2( SPO stmt1 ) { Index: InferenceEngine.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/InferenceEngine.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** InferenceEngine.java 18 Apr 2007 17:29:07 -0000 1.15 --- InferenceEngine.java 20 Apr 2007 16:36:36 -0000 1.16 *************** *** 551,555 **** * @param itr * The key scan iterator. ! * * @return The objects visited by that iterator. */ --- 551,555 ---- * @param itr * The key scan iterator. ! * * @return The objects visited by that iterator. */ Index: AbstractRuleRdfs_2_3_7_9.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/AbstractRuleRdfs_2_3_7_9.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** AbstractRuleRdfs_2_3_7_9.java 18 Apr 2007 17:29:07 -0000 1.1 --- AbstractRuleRdfs_2_3_7_9.java 20 Apr 2007 16:36:36 -0000 1.2 *************** *** 184,187 **** --- 184,197 ---- } + /** + * Builds the entailed triple from the matched triples. + * + * @param stmt1 + * The match on the 1st triple pattern. + * @param stmt2 + * The match on the 2nd triple pattern. + * + * @return The entailed triple. + */ protected abstract SPO buildStmt3( SPO stmt1, SPO stmt2 ); |
From: Bryan T. <tho...@us...> - 2007-04-20 16:36:48
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16041/src/java/com/bigdata/rdf Modified Files: TripleStore.java Log Message: Updated the IDataService interface and now have client talking over JERI to a data service instance for various btree operations, range count, and range query. Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.33 retrieving revision 1.34 diff -C2 -d -r1.33 -r1.34 *** TripleStore.java 19 Apr 2007 19:13:46 -0000 1.33 --- TripleStore.java 20 Apr 2007 16:36:36 -0000 1.34 *************** *** 153,156 **** --- 153,163 ---- * restart-safe manner even if we "forget" the term-id mapping. * + * @todo Provide a wrapper class for {@link IEntryIterator} that visits + * statements, terms, and ids (as appropriate). This will help to isolate + * the rdf store from changes to the manner in which keys and values are + * encoded in the indices, including whether the term ids can be decoded + * from the statement keys, and also will help to create a + * (de-)serialization divide for the values in the indices. + * * @todo modify the term identifier assignment mechanism to be compatible with * the scale-out index partitions (32-bit unique within index partition *************** *** 186,193 **** * * @todo support metadata about the statement, e.g., whether or not it is an ! * inference. consider that we may need to move the triple/quad ids into * the value in the statement indices since some key compression schemes ! * are not reversable (we depend on reversable keys to extract the term ! * ids for a statement). * * @todo Try a variant in which we have metadata linking statements and terms --- 193,200 ---- * * @todo support metadata about the statement, e.g., whether or not it is an ! * inference. consider that we may need to move the triple/quad ids into * the value in the statement indices since some key compression schemes ! * are not reversable (we depend on reversable keys to extract the term ! * ids for a statement). * * @todo Try a variant in which we have metadata linking statements and terms *************** *** 233,240 **** * * @todo provide option for closing aspects of the entire store vs just a single ! * context in a quad store. For example, in an open web and internet scale ! * kb it is unlikely that you would want to have all harvested ontologies ! * closed against all the data. however, that might make more sense in a ! * more controlled setting. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 240,255 ---- * * @todo provide option for closing aspects of the entire store vs just a single ! * context in a quad store vs just a "document" before it is loaded into a ! * triple store (but with the term identifiers of the triple store). For ! * example, in an open web and internet scale kb it is unlikely that you ! * would want to have all harvested ontologies closed against all the ! * data. however, that might make more sense in a more controlled setting. ! * ! * @todo provide a mechanism to make document loading robust to client failure. ! * When loads are unisolated, a client failure can result in the ! * statements being loaded into only a subset of the statement indices. ! * robust load would require a means for undo or redo of failed loads. a ! * loaded based on map/reduce would naturally provide a robust mechanism ! * using a redo model. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> |
From: Bryan T. <tho...@us...> - 2007-04-20 16:36:38
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16005/src/java/com/bigdata/service Modified Files: AbstractServer.java DataServer.java EmbeddedDataService.java IDataService.java DataService.java DataServiceClient.java MetadataServer.java Log Message: Updated the IDataService interface and now have client talking over JERI to a data service instance for various btree operations, range count, and range query. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** DataServer.java 27 Mar 2007 14:34:23 -0000 1.4 --- DataServer.java 20 Apr 2007 16:36:27 -0000 1.5 *************** *** 52,55 **** --- 52,57 ---- import java.util.Properties; + import net.jini.config.Configuration; + import com.bigdata.journal.IJournal; import com.sun.jini.start.LifeCycle; *************** *** 58,71 **** * The bigdata data server. * - * @todo reduce the permissions required to start the server with the server - * starter. - * * @see src/resources/config for sample configurations. * - * @todo write tests against an standalone installation and then see what it - * looks like when the data services are running on more than one host. - * note that unisolated operations can be tested without a transaction - * server. - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ --- 60,65 ---- *************** *** 74,78 **** --- 68,75 ---- /** + * Creates a new {@link DataServer}. + * * @param args + * The name of the {@link Configuration} file for the service. */ public DataServer(String[] args) { *************** *** 88,91 **** --- 85,94 ---- } + /** + * Starts a new {@link DataServer}. + * + * @param args + * The name of the {@link Configuration} file for the service. + */ public static void main(String[] args) { *************** *** 104,108 **** * service. */ ! protected void destroy() { DataService service = (DataService)impl; --- 107,111 ---- * service. */ ! public void destroy() { DataService service = (DataService)impl; Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** DataService.java 16 Apr 2007 10:35:28 -0000 1.7 --- DataService.java 20 Apr 2007 16:36:27 -0000 1.8 *************** *** 48,55 **** --- 48,59 ---- package com.bigdata.service; + import java.io.Externalizable; import java.io.IOException; + import java.io.ObjectOutputStream; + import java.io.Serializable; import java.net.InetSocketAddress; import java.rmi.Remote; import java.util.Properties; + import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; *************** *** 61,64 **** --- 65,69 ---- import com.bigdata.btree.BatchLookup; import com.bigdata.btree.BatchRemove; + import com.bigdata.btree.BytesUtil; import com.bigdata.btree.IBatchBTree; import com.bigdata.btree.IBatchOp; *************** *** 68,74 **** import com.bigdata.btree.IReadOnlyBatchOp; import com.bigdata.btree.ISimpleBTree; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.ITx; - import com.bigdata.journal.IsolationEnum; import com.bigdata.journal.Journal; import com.bigdata.util.concurrent.DaemonThreadFactory; --- 73,79 ---- import com.bigdata.btree.IReadOnlyBatchOp; import com.bigdata.btree.ISimpleBTree; + import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.util.concurrent.DaemonThreadFactory; *************** *** 345,359 **** } /** * ! * @todo the state of the op is changed as a side effect and needs to be ! * communicated back to a remote client. Also, the remote client does ! * not need to send uninitialized data across the network when the ! * batch operation will use the data purely for a response - we can ! * just initialize the data fields on this side of the interface and ! * then send them back across the network api. */ ! public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { --- 350,434 ---- } + + public void registerIndex(String name,UUID indexUUID) { + + journal.serialize(new RegisterIndexTask(name,indexUUID)); + + } + + public void dropIndex(String name) { + + journal.serialize(new DropIndexTask(name)); + + } + + public void batchInsert(long tx, String name, int ntuples, byte[][] keys, + byte[][] vals) throws InterruptedException, ExecutionException { + + batchOp( tx, name, new BatchInsert(ntuples, keys, vals)); + + } + + public boolean[] batchContains(long tx, String name, int ntuples, byte[][] keys + ) throws InterruptedException, ExecutionException { + + BatchContains op = new BatchContains(ntuples, keys, new boolean[ntuples]); + + batchOp( tx, name, op ); + + return op.contains; + + } + + public byte[][] batchLookup(long tx, String name, int ntuples, byte[][] keys) + throws InterruptedException, ExecutionException { + + BatchLookup op = new BatchLookup(ntuples,keys,new byte[ntuples][]); + + batchOp(tx, name, op); + + return (byte[][])op.values; + + } + + public byte[][] batchRemove(long tx, String name, int ntuples, + byte[][] keys, boolean returnOldValues) + throws InterruptedException, ExecutionException { + + BatchRemove op = new BatchRemove(ntuples,keys,new byte[ntuples][]); + + batchOp(tx, name, op); + + return returnOldValues ? (byte[][])op.values : null; + + } /** + * Executes a batch operation on a named btree. * ! * @param tx ! * The transaction identifier -or- zero (0L) IFF the operation is ! * NOT isolated by a transaction. ! * @param name ! * The index name (required). ! * @param op ! * The batch operation. ! * ! * @exception InterruptedException ! * if the operation was interrupted (typically by ! * {@link #shutdownNow()}. ! * @exception ExecutionException ! * If the operation caused an error. See ! * {@link ExecutionException#getCause()} for the underlying ! * error. ! * ! * @todo it is possible to have concurrent execution of batch operations for ! * distinct indices. In order to support this, the write thread would ! * have to become a pool of N worker threads fed from a queue of ! * operations. Concurrent writers can execute as long as they are ! * writing on different indices. (Concurrent readers can execute as ! * long as they are reading from a historical commit time.) */ ! protected void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { *************** *** 415,435 **** } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int flags) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); ! if (tx == 0L) ! throw new UnsupportedOperationException( ! "Unisolated context not allowed"); ! ! RangeQueryResult result = (RangeQueryResult) txService.submit( ! new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); ! return result; } ! // /** // * @todo if unisolated or isolated at the read-commit level, then the --- 490,555 ---- } + + public int rangeCount(long tx, String name, byte[] fromKey, byte[] toKey) + throws InterruptedException, ExecutionException { + + if (name == null) + throw new IllegalArgumentException(); + + final RangeCountTask task = new RangeCountTask(tx, name, fromKey, toKey); + + final boolean isolated = tx != 0L; + + if (isolated) { + + return (Integer) readService.submit(task).get(); + + } else { + + return (Integer) journal.serialize(task).get(); + + } + + } ! /** ! * ! * FIXME the iterator needs to be aware of the defintion of a "row" for the ! * sparse row store so that we can respect the atomic guarentee for reads as ! * well as writes. ! * ! * FIXME support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that encode a ! * column name and datum or write time into the key to those that will ! * filter based on inspection of the value associated with the key, e.g., ! * only values having some attribute. ! * ! * @todo if we allow the filter to cause mutations (e.g., deleting matching ! * entries) then we have to examine the operation to determine whether ! * or not we need to use the {@link #txService} or the ! * {@link #readService} ! */ ! public ResultSet rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int capacity, int flags) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); ! final RangeQueryTask task = new RangeQueryTask(tx, name, fromKey, ! toKey, capacity, flags); ! ! final boolean isolated = tx != 0L; ! if(isolated) { ! ! return (ResultSet) readService.submit(task).get(); ! ! } else { ! ! return (ResultSet) journal.serialize(task).get(); ! ! } } ! // /** // * @todo if unisolated or isolated at the read-commit level, then the *************** *** 458,462 **** // int flags = 0; // @todo set to deliver keys + values for map op. // ! // RangeQueryResult result = (RangeQueryResult) txService.submit( // new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); // --- 578,582 ---- // int flags = 0; // @todo set to deliver keys + values for map op. // ! // ResultSet result = (ResultSet) txService.submit( // new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); // *************** *** 467,470 **** --- 587,671 ---- // // } + + /** + * + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ + private abstract class AbstractIndexManagementTask implements Callable<Object> { + + protected final String name; + + protected AbstractIndexManagementTask(String name) { + + if(name==null) throw new IllegalArgumentException(); + + this.name = name; + + } + + } + + private class RegisterIndexTask extends AbstractIndexManagementTask { + + protected final UUID indexUUID; + + public RegisterIndexTask(String name,UUID indexUUID) { + + super(name); + + if(indexUUID==null) throw new IllegalArgumentException(); + + this.indexUUID = indexUUID; + + } + + public Object call() throws Exception { + + IIndex ndx = journal.getIndex(name); + + if(ndx != null) { + + if(!ndx.getIndexUUID().equals(indexUUID)) { + + throw new IllegalStateException( + "Index already registered with that name and a different indexUUID"); + + } + + return ndx; + + } + + ndx = journal.registerIndex(name, new UnisolatedBTree(journal, indexUUID)); + + journal.commit(); + + return ndx; + + } + + } + + private class DropIndexTask extends AbstractIndexManagementTask { + + public DropIndexTask(String name) { + + super(name); + + } + + public Object call() throws Exception { + + journal.dropIndex(name); + + journal.commit(); + + return null; + + } + + } /** *************** *** 650,684 **** } ! private class RangeQueryTask implements Callable<Object> { private final String name; private final byte[] fromKey; private final byte[] toKey; - private final int flags; private final ITx tx; ! public RangeQueryTask(long startTime, String name, byte[] fromKey, ! byte[] toKey, int flags) { ! assert startTime != 0L; ! tx = journal.getTx(startTime); ! if (tx == null) { ! throw new IllegalStateException("Unknown tx"); } ! if (!tx.isActive()) { ! throw new IllegalStateException("Tx not active"); } ! if( tx.getIsolationLevel() == IsolationEnum.ReadCommitted ) { ! throw new UnsupportedOperationException("Read-committed not supported"); } --- 851,985 ---- } ! private class RangeCountTask implements Callable<Object> { + // startTime or 0L iff unisolated. + private final long startTime; private final String name; private final byte[] fromKey; private final byte[] toKey; private final ITx tx; ! public RangeCountTask(long startTime, String name, byte[] fromKey, ! byte[] toKey) { ! this.startTime = startTime; ! if(startTime != 0L) { ! ! /* ! * Isolated read. ! */ ! ! tx = journal.getTx(startTime); ! ! if (tx == null) { ! ! throw new IllegalStateException("Unknown tx"); ! ! } ! ! if (!tx.isActive()) { ! ! throw new IllegalStateException("Tx not active"); ! ! } ! ! } else { ! ! /* ! * Unisolated read. ! */ ! ! tx = null; ! ! } ! this.name = name; ! this.fromKey = fromKey; ! this.toKey = toKey; ! ! } ! ! public IIndex getIndex(String name) { ! ! if(tx==null) { ! ! return journal.getIndex(name); ! ! } else { ! return tx.getIndex(name); } + + } + + public Object call() throws Exception { ! IIndex ndx = getIndex(name); ! ! if(ndx==null) { ! throw new IllegalStateException("No such index: "+name); } + + return new Integer(ndx.rangeCount(fromKey, toKey)); ! } ! ! } ! ! private class RangeQueryTask implements Callable<Object> { ! ! // startTime or 0L iff unisolated. ! private final long startTime; ! private final String name; ! private final byte[] fromKey; ! private final byte[] toKey; ! private final int capacity; ! private final int flags; ! ! private final ITx tx; ! ! public RangeQueryTask(long startTime, String name, byte[] fromKey, ! byte[] toKey, int capacity, int flags) { ! ! this.startTime = startTime; ! ! if(startTime != 0L) { ! /* ! * Isolated read. ! */ ! ! tx = journal.getTx(startTime); ! ! if (tx == null) { ! ! throw new IllegalStateException("Unknown tx"); ! ! } ! ! if (!tx.isActive()) { ! ! throw new IllegalStateException("Tx not active"); ! ! } ! ! // if( tx.getIsolationLevel() == IsolationEnum.ReadCommitted ) { ! // ! // throw new UnsupportedOperationException("Read-committed not supported"); ! // ! // } ! ! } else { ! ! /* ! * Unisolated read. ! */ ! ! tx = null; } *************** *** 687,690 **** --- 988,992 ---- this.fromKey = fromKey; this.toKey = toKey; + this.capacity = capacity; this.flags = flags; *************** *** 693,697 **** public IIndex getIndex(String name) { ! return tx.getIndex(name); } --- 995,1007 ---- public IIndex getIndex(String name) { ! if(tx==null) { ! ! return journal.getIndex(name); ! ! } else { ! ! return tx.getIndex(name); ! ! } } *************** *** 701,758 **** IIndex ndx = getIndex(name); ! final int count = ndx.rangeCount(fromKey, toKey); ! boolean countOnly = false; ! final IEntryIterator itr = (countOnly ? null : ndx.rangeIterator( ! fromKey, toKey)); ! return new RangeQueryResult(count, itr, tx.getStartTimestamp(), ! name, fromKey, toKey, flags); } } ! /** - * @todo must keep track of the open iterators on the transaction and - * invalidate them once the transaction completes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public static class RangeQueryResult { ! public final int count; ! public final IEntryIterator itr; ! public final long startTime; ! public final String name; ! public final byte[] fromKey; ! public final byte[] toKey; ! public final int flags; ! public RangeQueryResult(int count, IEntryIterator itr, long startTime, String name, ! byte[] fromKey, byte[] toKey, int flags) { ! this.count = count; ! this.itr = itr; - this.startTime = startTime; - this.name = name; - this.fromKey = fromKey; - this.toKey = toKey; - this.flags = flags; - } } ! /** ! * Abstract class for tasks that execute batch api operations. There are ! * various concrete subclasses, each of which MUST be submitted to the ! * appropriate service for execution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 1011,1160 ---- IIndex ndx = getIndex(name); ! if(ndx==null) { ! ! throw new IllegalStateException("No such index: "+name); ! ! } ! final boolean sendKeys = (flags & KEYS) != 0; ! final boolean sendVals = (flags & VALS) != 0; ! /* ! * setup iterator since we will visit keys and/or values in the key ! * range. ! */ ! return new ResultSet(ndx, fromKey, toKey, capacity, sendKeys, ! sendVals); } } ! /** * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * @todo implement {@link Externalizable}, probably buffer the + * {@link ObjectOutputStream} and focus on byte[] transfers. */ ! public static class ResultSet implements Serializable { ! /** ! * Total #of key-value pairs within the key range (approximate). ! */ ! public final int rangeCount; ! /** ! * Actual #of key-value pairs in the {@link ResultSet} ! */ ! public final int ntuples; ! /** ! * True iff the iterator exhausted the available keys such that no more ! * results would be available if you formed the successor of the ! * {@link #lastKey}. ! */ ! final public boolean exhausted; ! ! /** ! * The last key visited by the iterator <em>regardless</em> of the ! * filter imposed -or- <code>null</code> iff no keys were visited by ! * the iterator for the specified key range. ! * ! * @see #nextKey() ! */ ! public final byte[] lastKey; ! /** ! * The next key that should be used to retrieve keys and/or values ! * starting from the first possible successor of the {@link #lastKey} ! * visited by the iterator in this operation (the successor is formed by ! * appending a <code>nul</code> byte to the {@link #lastKey}). ! * ! * @return The successor of {@link #lastKey} -or- <code>null</code> ! * iff the iterator exhausted the available keys. ! * ! * @exception UnsupportedOperationException ! * if the {@link #lastKey} is <code>null</code>. ! */ ! public byte[] nextKey() { ! if (lastKey == null) ! throw new UnsupportedOperationException(); ! return BytesUtil.successor(lastKey); } + /** + * The visited keys iff the {@link RangeQueryEnum#Keys} flag was set. + */ + public final byte[][] keys; + + /** + * The visited values iff the {@link RangeQueryEnum#Values} flag was + * set. + */ + public final byte[][] vals; + + public ResultSet(final IIndex ndx, final byte[] fromKey, + final byte[] toKey, final int capacity, final boolean sendKeys, + final boolean sendVals) { + + // The upper bound on the #of key-value pairs in the range. + rangeCount = ndx.rangeCount(fromKey, toKey); + + final int limit = (rangeCount > capacity ? capacity : rangeCount); + + int ntuples = 0; + + keys = (sendKeys ? new byte[limit][] : null); + + vals = (sendVals ? new byte[limit][] : null); + + // iterator that will visit the key range. + IEntryIterator itr = ndx.rangeIterator(fromKey, toKey); + + /* + * true if any keys were visited regardless of whether or not they + * satisified the optional filter. This is used to make sure that we + * always return the lastKey visited if any keys were visited and + * otherwise set lastKey := null. + */ + boolean anything = false; + + while (ntuples < limit && itr.hasNext()) { + + anything = true; + + byte[] val = (byte[]) itr.next(); + + if (sendVals) + vals[ntuples] = val; + + if (sendKeys) + keys[ntuples] = itr.getKey(); + + // #of results that will be returned. + ntuples++; + + } + + this.ntuples = ntuples; + + this.lastKey = (anything ? itr.getKey() : null); + + this.exhausted = ! itr.hasNext(); + + } + } ! /** ! * Abstract class for tasks that execute {@link IProcedure} operations. ! * There are various concrete subclasses, each of which MUST be submitted to ! * the appropriate service for execution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** IDataService.java 16 Apr 2007 10:02:50 -0000 1.7 --- IDataService.java 20 Apr 2007 16:36:27 -0000 1.8 *************** *** 45,56 **** import java.io.IOException; import java.util.concurrent.ExecutionException; import com.bigdata.btree.BTree; ! import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ITransactionManager; import com.bigdata.journal.ITxCommitProtocol; ! import com.bigdata.journal.IsolationEnum; ! import com.bigdata.service.DataService.RangeQueryResult; /** --- 45,56 ---- import java.io.IOException; + import java.util.UUID; import java.util.concurrent.ExecutionException; import com.bigdata.btree.BTree; ! import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.ITransactionManager; import com.bigdata.journal.ITxCommitProtocol; ! import com.bigdata.service.DataService.ResultSet; /** *************** *** 94,97 **** --- 94,160 ---- /** + * A constant that may be used as the transaction identifier when the + * operation is <em>unisolated</em> (non-transactional). The value of + * this constant is ZERO (0L). + */ + public static final long UNISOLATED = 0L; + + /** + * Flag specifies that keys in the key range will be returned. When not + * given, the keys will NOT be included in the {@link ResultSet} sent to the + * client. + */ + public static final int KEYS = 1 << 0; + + /** + * Flag specifies that values in the key range will be returned. When not + * given, the values will NOT be included in the {@link ResultSet} sent to + * the client. + */ + public static final int VALS = 1 << 1; + + /** + * Register a named mutable B+Tree on the {@link DataService} (unisolated). + * The keys will be variable length unsigned byte[]s. The values will be + * variable length byte[]s. The B+Tree will support version counters and + * delete markers (it will be compatible with the use of transactions for + * concurrency control). + * + * @param name + * The name that can be used to recover the index. + * + * @param indexUUID + * The UUID that identifies the index. When the mutable B+Tree is + * part of a scale-out index, then you MUST provide the indexUUID + * for that scale-out index. Otherwise this MUST be a random + * UUID, e.g., using {@link UUID#randomUUID()}. + * + * @return The object that would be returned by {@link #getIndex(String)}. + * + * @todo exception if index exists? + * + * @todo provide configuration options {whether the index supports isolation + * or not ({@link BTree} vs {@link UnisolatedBTree}), the branching + * factor for the index, and the value serializer. For a client server + * divide I think that we can always go with an + * {@link UnisolatedBTree}. We should pass in the UUID so that this + * can be used by the {@link MetadataService} to create mutable btrees + * to absorb writes when one or more partitions of a scale out index + * are mapped onto the {@link DataService}. + */ + public void registerIndex(String name,UUID uuid) throws IOException; + + /** + * Drops the named index (unisolated). + * + * @param name + * The name of the index to be dropped. + * + * @exception IllegalArgumentException + * if <i>name</i> does not identify a registered index. + */ + public void dropIndex(String name) throws IOException; + + /** * <p> * Used by the client to submit a batch operation on a named B+Tree *************** *** 133,136 **** --- 196,201 ---- * The batch operation. * + * @exception IOException + * if there was a problem with the RPC. * @exception InterruptedException * if the operation was interrupted (typically by *************** *** 141,154 **** * error. * ! * @todo it is possible to have concurrent execution of batch operations for ! * distinct indices. In order to support this, the write thread would ! * have to become a pool of N worker threads fed from a queue of ! * operations. Concurrent writers can execute as long as they are ! * writing on different indices. (Concurrent readers can execute as ! * long as they are reading from a historical commit time.) */ ! public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException, IOException; ! /** * <p> --- 206,228 ---- * error. * ! * @todo javadoc update. ! * @todo support extension operations (read or mutable). */ ! public void batchInsert(long tx, String name, int ntuples, byte[][] keys, ! byte[][] values) throws InterruptedException, ExecutionException, ! IOException; ! ! public boolean[] batchContains(long tx, String name, int ntuples, ! byte[][] keys) throws InterruptedException, ExecutionException, ! IOException; ! ! public byte[][] batchLookup(long tx, String name, int ntuples, ! byte[][] keys) throws InterruptedException, ExecutionException, ! IOException; ! ! public byte[][] batchRemove(long tx, String name, int ntuples, ! byte[][] keys, boolean returnOldValues) throws InterruptedException, ExecutionException, IOException; ! /** * <p> *************** *** 156,172 **** * </p> * <p> ! * Note: The rangeQuery operation is NOT allowed for read-committed ! * transactions (the underlying constraint is that the {@link BTree} does ! * NOT support traversal under concurrent modification so this operation is ! * limited to read-only or fully isolated transactions or to unisolated ! * reads against a historical commit time). * </p> * * @param tx * @param name * @param fromKey * @param toKey * @param flags ! * (@todo define flags: count yes/no, keys yes/no, values yes/no) * * @exception InterruptedException --- 230,257 ---- * </p> * <p> ! * In order to visit all keys in a range, clients are expected to issue ! * repeated calls in which the <i>fromKey</i> is incremented to the ! * successor of the last key visited until either an empty {@link ResultSet} ! * is returned or the {@link ResultSet#isLast()} flag is set, indicating ! * that all keys up to (but not including) the <i>startKey</i> have been ! * visited. * </p> * * @param tx + * The transaction identifier -or- zero (0L) IFF the operation is + * NOT isolated by a transaction. * @param name + * The index name (required). * @param fromKey + * The starting key for the scan. * @param toKey + * The first key that will not be visited. + * @param capacity + * The maximum #of key-values to return. (This must be rounded up + * if necessary in order to all values selected for a single row + * of a sparse row store.) * @param flags ! * One or more flags formed by bitwise OR of the constants ! * defined by {@link RangeQueryEnum}. * * @exception InterruptedException *************** *** 177,194 **** * {@link ExecutionException#getCause()} for the underlying * error. - * @exception UnsupportedOperationException - * If the tx is zero (0L) (indicating an unisolated - * operation) -or- if the identifed transaction is - * {@link IsolationEnum#ReadCommitted}. * ! * FIXME support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that encode a ! * column name and datum or write time into the key to those that will ! * filter based on inspection of the value associated with the key, e.g., ! * only values having some attribute. */ ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int flags) throws InterruptedException, ExecutionException, IOException, IOException; /** * <p> --- 262,360 ---- * {@link ExecutionException#getCause()} for the underlying * error. * ! * @todo provide for optional filter. */ ! public ResultSet rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int capacity, int flags) throws InterruptedException, ExecutionException, IOException, IOException; + + /** + * <p> + * Range count of entries in a key range for the named index on this + * {@link DataService}. + * </p> + * + * @param tx + * The transaction identifier -or- zero (0L) IFF the operation is + * NOT isolated by a transaction. + * @param name + * The index name (required). + * @param fromKey + * The starting key for the scan. + * @param toKey + * The first key that will not be visited. + * + * @return The upper bound estimate of the #of key-value pairs in the key + * range of the partition(s) of the named index found on this + * {@link DataService}. + * + * @exception InterruptedException + * if the operation was interrupted (typically by + * {@link #shutdownNow()}. + * @exception ExecutionException + * If the operation caused an error. See + * {@link ExecutionException#getCause()} for the underlying + * error. + */ + public int rangeCount(long tx, String name, byte[] fromKey, byte[] toKey) + throws InterruptedException, ExecutionException, IOException, + IOException; + + // /** + // * Typesafe enum for flags that control the behavior of + // * {@link IDataService#rangeQuery(long, String, byte[], byte[], int, int)} + // * + // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + // * @version $Id$ + // */ + // public static enum RangeQueryEnum { + // + // /** + // * Flag specifies that keys in the key range will be returned. When not + // * given, the keys will NOT be included in the {@link ResultSetChunk}s + // * sent to the client. + // */ + // Keys(1 << 1), + // + // /** + // * Flag specifies that values in the key range will be returned. When + // * not given, the values will NOT be included in the + // * {@link ResultSetChunk}s sent to the client. + // */ + // Values(1 << 2); + // + // private final int flag; + // + // private RangeQueryEnum(int flag) { + // + // this.flag = flag; + // + // } + // + // /** + // * True iff this flag is set. + // * + // * @param flags + // * An integer on which zero or more flags have been set. + // * + // * @return True iff this flag is set. + // */ + // public boolean isSet(int flags) { + // + // return (flags & flag) == 1; + // + // } + // + // /** + // * The bit mask for this flag. + // */ + // public final int valueOf() { + // + // return flag; + // + // } + // + // }; + /** * <p> *************** *** 220,223 **** --- 386,390 ---- * The procedure to be executed. * + * @throws IOException * @throws InterruptedException * @throws ExecutionException Index: MetadataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataServer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** MetadataServer.java 27 Mar 2007 14:34:24 -0000 1.1 --- MetadataServer.java 20 Apr 2007 16:36:27 -0000 1.2 *************** *** 188,192 **** * metadata service. */ ! protected void destroy() { MetadataService service = (MetadataService)impl; --- 188,192 ---- * metadata service. */ ! public void destroy() { MetadataService service = (MetadataService)impl; Index: DataServiceClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServiceClient.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** DataServiceClient.java 13 Apr 2007 15:04:19 -0000 1.5 --- DataServiceClient.java 20 Apr 2007 16:36:27 -0000 1.6 *************** *** 54,58 **** import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.RangeQueryResult; /** --- 54,58 ---- import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.ResultSet; /** *************** *** 66,70 **** * service will support a scale-out solution. */ ! public class DataServiceClient implements IDataService { final IDataService delegate; --- 66,70 ---- * service will support a scale-out solution. */ ! abstract public class DataServiceClient implements IDataService { final IDataService delegate; *************** *** 105,115 **** } ! public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException, IOException { ! delegate.batchOp(tx, name, op); ! } ! ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException, IOException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { --- 105,115 ---- } ! // public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException, IOException { ! // delegate.batchOp(tx, name, op); ! // } ! // ! // public ResultSet rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException, IOException { ! // return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! // } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { Index: EmbeddedDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/EmbeddedDataService.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** EmbeddedDataService.java 13 Apr 2007 15:04:19 -0000 1.4 --- EmbeddedDataService.java 20 Apr 2007 16:36:27 -0000 1.5 *************** *** 56,60 **** import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.RangeQueryResult; import com.bigdata.util.concurrent.DaemonThreadFactory; --- 56,60 ---- import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.ResultSet; import com.bigdata.util.concurrent.DaemonThreadFactory; *************** *** 71,75 **** * using delegation patterns). */ ! public class EmbeddedDataService implements IDataService, IServiceShutdown { private final DataService delegate; --- 71,75 ---- * using delegation patterns). */ ! abstract public class EmbeddedDataService implements IDataService, IServiceShutdown { private final DataService delegate; *************** *** 118,124 **** // } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { --- 118,124 ---- // } ! // public ResultSet rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException { ! // return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! // } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { Index: AbstractServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractServer.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** AbstractServer.java 27 Mar 2007 14:34:23 -0000 1.5 --- AbstractServer.java 20 Apr 2007 16:36:27 -0000 1.6 *************** *** 185,188 **** --- 185,197 ---- private boolean open = false; + + /** + * Return the assigned {@link ServiceID}. + */ + public ServiceID getServiceID() { + + return serviceID; + + } /** *************** *** 460,463 **** --- 469,474 ---- log.info("serviceID=" + serviceID); + this.serviceID = serviceID; + if (serviceIdFile != null) { *************** *** 649,653 **** * state. */ ! protected void destroy() { shutdownNow(); --- 660,664 ---- * state. */ ! public void destroy() { shutdownNow(); |
From: Bryan T. <tho...@us...> - 2007-04-20 16:36:38
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16005/src/test/com/bigdata/service Modified Files: TestServiceDiscovery.java TestAll.java Added Files: TestDataServer0.java Log Message: Updated the IDataService interface and now have client talking over JERI to a data service instance for various btree operations, range count, and range query. Index: TestServiceDiscovery.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestServiceDiscovery.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestServiceDiscovery.java 22 Mar 2007 21:11:24 -0000 1.1 --- TestServiceDiscovery.java 20 Apr 2007 16:36:27 -0000 1.2 *************** *** 114,124 **** /** ! * Tests spans several services and verifies that we can discovery each of ! * them. * * @throws IOException * @throws ClassNotFoundException */ - public void test_serviceDiscovery() throws IOException, ClassNotFoundException { --- 114,123 ---- /** ! * Tests launches a service and verifies that we can discovery it and invoke ! * an operation exposed by that service. * * @throws IOException * @throws ClassNotFoundException */ public void test_serviceDiscovery() throws IOException, ClassNotFoundException { Index: TestAll.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/service/TestAll.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestAll.java 22 Mar 2007 21:11:24 -0000 1.1 --- TestAll.java 20 Apr 2007 16:36:27 -0000 1.2 *************** *** 57,61 **** * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ - public class TestAll extends TestCase { --- 57,60 ---- *************** *** 71,75 **** suite.addTestSuite( TestServiceDiscovery.class ); ! // suite.addTestSuite( TestServer.class ); // Does not implement TestCase. return suite; --- 70,80 ---- suite.addTestSuite( TestServiceDiscovery.class ); ! /* ! * Test of a single client talking to a single data service instance ! * without the use of the metadata service or a transaction manager. ! */ ! suite.addTestSuite( TestDataServer0.class ); ! ! // suite.addTestSuite( TestServer.class ); // Does not implement TestCase. return suite; --- NEW FILE: TestDataServer0.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 20, 2007 */ package com.bigdata.service; import java.io.IOException; import java.net.InetAddress; import java.util.UUID; import junit.framework.AssertionFailedError; import junit.framework.TestCase2; import net.jini.core.discovery.LookupLocator; import net.jini.core.lookup.ServiceID; import net.jini.core.lookup.ServiceRegistrar; import net.jini.core.lookup.ServiceTemplate; import com.bigdata.service.DataService.ResultSet; /** * Test of client-server communications. The test starts a {@link DataServer} * and then verifies that basic operations can be carried out against that * server. The server is stopped when the test is torn down. * <p> * Note: This test uses the <code>DataServer0.config</code> file from the * src/resources/config/standalone package. * <p> * Note: The <code>bigdata</code> JAR must be current in order for the client * and the service to agree on interface definitions, etc. You can use * <code>build.xml</code> in the root of this module to update that JAR. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo write another test class that accesses more than one * {@link DataService} instance, e.g., by placing a different index on * each {@link DataService}. */ public class TestDataServer0 extends TestCase2 { /** * */ public TestDataServer0() { } /** * @param arg0 */ public TestDataServer0(String arg0) { super(arg0); } String[] args = new String[]{ "src/resources/config/standalone/DataServer0.config" }; DataServer dataServer0; // start server in its own thread. public void setUp() throws Exception { dataServer0 = new DataServer(args); new Thread() { public void run() { dataServer0.run(); } }.start(); } /** * destroy the test service. */ public void tearDown() throws Exception { dataServer0.destroy(); } /** * Return the {@link ServiceID} of the specific service that we launched in * #setUp(). * * @exception AssertionFailedError * If the {@link ServiceID} can not be found after a timeout. * * @exception InterruptedException * if the thread is interrupted while it is waiting to retry. */ protected ServiceID getServiceID() throws AssertionFailedError, InterruptedException { ServiceID serviceID = null; for(int i=0; i<10 && serviceID == null; i++) { /* * Note: This can be null since the serviceID is not assigned * synchonously by the registrar. */ serviceID = dataServer0.getServiceID(); if(serviceID == null) { /* * We wait a bit and retry until we have it or timeout. */ Thread.sleep(200); } } assertNotNull("serviceID",serviceID); return serviceID; } /** * Exercises the basic features of the {@link IDataService} interface using * unisolated operations, including creating a B+Tree, insert, contains, * lookup, and remove operations on key-value pairs in that B+Tree, and * dropping the B+Tree. * * @throws Exception */ public void test_serverRunning() throws Exception { ServiceID serviceID = getServiceID(); IDataService proxy = lookupDataService(serviceID); assertNotNull("service not discovered",proxy); final String name = "testIndex"; // add an index. proxy.registerIndex(name, UUID.randomUUID()); // batch insert into that index. proxy.batchInsert(IDataService.UNISOLATED, name, 1, new byte[][] { new byte[] { 1 } }, new byte[][] { new byte[] { 1 } }); // verify keys that exist/do not exist. boolean contains[] = proxy.batchContains(IDataService.UNISOLATED, name, 2, new byte[][] { new byte[] { 1 }, new byte[] { 2 } }); assertNotNull(contains); assertEquals(2,contains.length); assertTrue(contains[0]); assertFalse(contains[1]); // lookup keys that do and do not exist. byte[][] values = proxy.batchLookup(IDataService.UNISOLATED, name, 2, new byte[][] { new byte[] { 1 }, new byte[] { 2 } }); assertNotNull(values); assertEquals(2,values.length); assertEquals(new byte[]{1},values[0]); assertEquals(null,values[1]); // rangeCount the all partitions of the index on the data service. assertEquals(1, proxy.rangeCount(IDataService.UNISOLATED, name, null, null)); /* * visit all keys and values for the partitions of the index on the data * service. */ final int flags = IDataService.KEYS | IDataService.VALS; ResultSet rset = proxy.rangeQuery(IDataService.UNISOLATED, name, null, null, 100, flags ); assertEquals("rangeCount",1,rset.rangeCount); assertEquals("ntuples",1,rset.ntuples); assertTrue("exhausted",rset.exhausted); assertEquals("lastKey",new byte[]{1},rset.lastKey); assertEquals("keys.length",1,rset.keys.length); assertEquals(new byte[]{1},rset.keys[0]); assertEquals("vals.length",1,rset.vals.length); assertEquals(new byte[]{1},rset.vals[0]); // remove key that exists, verifying the returned values. values = proxy.batchRemove(IDataService.UNISOLATED, name, 2, new byte[][] { new byte[] { 1 }, new byte[] { 2 } }, true); assertNotNull(values); assertEquals(2,values.length); assertEquals(new byte[]{1},values[0]); assertEquals(null,values[1]); // remove again, verifying that the returned values are now null. values = proxy.batchRemove(IDataService.UNISOLATED, name, 2, new byte[][] { new byte[] { 1 }, new byte[] { 2 } }, true); assertNotNull(values); assertEquals(2,values.length); assertEquals(null,values[0]); assertEquals(null,values[1]); proxy.dropIndex(name); } /** * Lookup a {@link DataService} by its {@link ServiceID}. * * @param serviceID * The {@link ServiceID}. */ public IDataService lookupDataService(ServiceID serviceID) throws IOException, ClassNotFoundException, InterruptedException { /* * Lookup the discover service (unicast on localhost). */ // get the hostname. InetAddress addr = InetAddress.getLocalHost(); String hostname = addr.getHostName(); // Find the service registrar (unicast protocol). final int timeout = 4*1000; // seconds. System.err.println("hostname: "+hostname); LookupLocator lookupLocator = new LookupLocator("jini://"+hostname); ServiceRegistrar serviceRegistrar = lookupLocator.getRegistrar( timeout ); /* * Prepare a template for lookup search. * * Note: The client needs a local copy of the interface in order to be * able to invoke methods on the service without using reflection. The * implementation class will be downloaded from the codebase identified * by the server. */ ServiceTemplate template = new ServiceTemplate(// /* * use this to request the service by its serviceID. */ serviceID, /* * Use this to filter services by an interface that they expose. */ // new Class[] { IDataService.class }, null, /* * use this to filter for services by Entry attributes. */ null); /* * Lookup a service. This can fail if the service registrar has not * finished processing the service registration. If it does, you can * generally just retry the test and it will succeed. However this * points out that the client may need to wait and retry a few times if * you are starting everthing up at once (or just register for * notification events for the service if it is not found and enter a * wait state). */ IDataService service = null; for (int i = 0; i < 10 && service == null; i++) { service = (IDataService) serviceRegistrar .lookup(template /* , maxMatches */); if (service == null) { System.err.println("Service not found: sleeping..."); Thread.sleep(200); } } if(service!=null) { System.err.println("Service found."); } return service; } } |
From: Bryan T. <tho...@us...> - 2007-04-20 16:36:32
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16005/src/java/com/bigdata/journal Modified Files: IIndexManager.java Log Message: Updated the IDataService interface and now have client talking over JERI to a data service instance for various btree operations, range count, and range query. Index: IIndexManager.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IIndexManager.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IIndexManager.java 13 Apr 2007 15:04:23 -0000 1.4 --- IIndexManager.java 20 Apr 2007 16:36:27 -0000 1.5 *************** *** 109,112 **** --- 109,116 ---- * if <i>name</i> does not identify a registered index. * + * @todo reconsider the deletion of index resources. The should probably be + * explicitly purged by a restart-safe delete from the metadata + * service in due time. + * * @todo add a rename index method, but note that names in the file system * would not change. |
From: Bryan T. <tho...@us...> - 2007-04-19 19:13:50
|
Update of /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/inf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv3033/src/test/com/bigdata/rdf/inf Modified Files: TestRuleRdfs11.java TestRuleRdf01.java Log Message: Optimization for Rule rdf1. Index: TestRuleRdfs11.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/inf/TestRuleRdfs11.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestRuleRdfs11.java 18 Apr 2007 17:29:06 -0000 1.2 --- TestRuleRdfs11.java 19 Apr 2007 19:13:46 -0000 1.3 *************** *** 51,56 **** import org.openrdf.vocabulary.RDFS; - import com.bigdata.rdf.TempTripleStore; - import com.bigdata.rdf.inf.Rule.Stats; import com.bigdata.rdf.model.OptimizedValueFactory._URI; --- 51,54 ---- *************** *** 76,79 **** --- 74,80 ---- } + /** + * Simple test verifies inference of a subclassof entailment. + */ public void test_rdfs11() { Index: TestRuleRdf01.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/inf/TestRuleRdf01.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestRuleRdf01.java 19 Apr 2007 12:27:38 -0000 1.1 --- TestRuleRdf01.java 19 Apr 2007 19:13:46 -0000 1.2 *************** *** 48,51 **** --- 48,56 ---- package com.bigdata.rdf.inf; + import org.openrdf.model.URI; + import org.openrdf.vocabulary.RDF; + + import com.bigdata.rdf.model.OptimizedValueFactory._URI; + /** * Test suite for {@link RuleRdf01}. *************** *** 75,86 **** /** ! * FIXME write test of basic rule semantics and then write another test that ! * can be used to verify that we are doing an efficient scan for the ! * distinct predicates (key prefix scan). */ public void test_rdf01() { ! fail("write test"); } --- 80,151 ---- /** ! * Basic of rule semantics. */ public void test_rdf01() { ! URI A = new _URI("http://www.foo.org/A"); ! URI B = new _URI("http://www.foo.org/B"); ! URI C = new _URI("http://www.foo.org/C"); ! ! URI rdfType = new _URI(RDF.TYPE); ! URI rdfProperty = new _URI(RDF.PROPERTY); ! ! store.addStatement(A, B, C); ! ! assertTrue(store.containsStatement(A, B, C)); ! assertFalse(store.containsStatement(B, rdfType, rdfProperty )); ! ! applyRule(store.rdf1, 1/* numComputed */, 1/* numCopied */); ! ! /* ! * validate the state of the primary store. ! */ ! assertTrue(store.containsStatement(A, B, C)); ! assertTrue(store.containsStatement(B, rdfType, rdfProperty )); ! ! } ! ! /** ! * Test that can be used to verify that we are doing an efficient scan for ! * the distinct predicates (distinct key prefix scan). ! */ ! public void test_rdf01_distinctPrefixScan() { + URI A = new _URI("http://www.foo.org/A"); + URI B = new _URI("http://www.foo.org/B"); + URI C = new _URI("http://www.foo.org/C"); + URI D = new _URI("http://www.foo.org/D"); + URI E = new _URI("http://www.foo.org/E"); + + URI rdfType = new _URI(RDF.TYPE); + URI rdfProperty = new _URI(RDF.PROPERTY); + + /* + * Three statements that will trigger the rule, but two statements share + * the same predicate. When it does the minimum amount of work, the rule + * will fire for each distinct predicate in the KB -- for this KB that + * is only twice. + */ + store.addStatement(A, B, C); + store.addStatement(C, B, D); + store.addStatement(A, E, C); + + assertTrue(store.containsStatement(A, B, C)); + assertTrue(store.containsStatement(C, B, D)); + assertTrue(store.containsStatement(A, E, C)); + assertFalse(store.containsStatement(B, rdfType, rdfProperty )); + assertFalse(store.containsStatement(E, rdfType, rdfProperty )); + + applyRule(store.rdf1, 2/* numComputed */, 2/* numCopied */); + + /* + * validate the state of the primary store. + */ + assertTrue(store.containsStatement(A, B, C)); + assertTrue(store.containsStatement(C, B, D)); + assertTrue(store.containsStatement(A, E, C)); + assertTrue(store.containsStatement(B, rdfType, rdfProperty )); + assertTrue(store.containsStatement(E, rdfType, rdfProperty )); + } |
From: Bryan T. <tho...@us...> - 2007-04-19 19:13:50
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv3033/src/java/com/bigdata/rdf/inf Modified Files: RuleRdfs02.java AbstractRuleRdf.java Rule.java RuleRdfs09.java RuleRdf01.java RuleRdfs03.java Log Message: Optimization for Rule rdf1. Index: RuleRdf01.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/RuleRdf01.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** RuleRdf01.java 18 Apr 2007 17:29:07 -0000 1.9 --- RuleRdf01.java 19 Apr 2007 19:13:45 -0000 1.10 *************** *** 44,48 **** package com.bigdata.rdf.inf; ! import com.bigdata.btree.IEntryIterator; import com.bigdata.rdf.KeyOrder; --- 44,50 ---- package com.bigdata.rdf.inf; ! import java.util.ArrayList; ! import java.util.Iterator; ! import com.bigdata.rdf.KeyOrder; *************** *** 70,111 **** final long computeStart = System.currentTimeMillis(); ! long lastP = -1; ! ! /* ! * This is essentially doing a "select distinct predicate". ! * ! * FIXME there should be a lighter weight way of achieving this result. ! * One way would be to create a "predicates" index that had each ! * distinct predicate (further denormalizing the schema and requiring ! * maintenance). Another approach is to restart the iterator each time a ! * predicate [p] is found by computing the fromKey as [p+1] thereby ! * skipping over all intervening statements in the index (consider how ! * to distribute that query in parallel). ! * ! * @todo write a test for this rule and then write an optimized variant ! * using the incremented toKey approach. ! */ ! IEntryIterator it = store.getPOSIndex().rangeIterator(null,null); ! ! while ( it.hasNext() ) { ! ! it.next(); ! ! stats.stmts1++; ! SPO stmt = new SPO(KeyOrder.POS, store.keyBuilder, it.getKey()); ! if ( stmt.p != lastP ) { ! ! lastP = stmt.p; - buffer.add(new SPO(stmt.p, store.rdfType.id, - store.rdfProperty.id)); - stats.numComputed++; ! } ! ! } stats.computeTime += System.currentTimeMillis() - computeStart; --- 72,135 ---- final long computeStart = System.currentTimeMillis(); ! // if (false) { ! // ! // /* ! // * Original implementation does a full scan of all statements in the ! // * KB. ! // */ ! // ! // long lastP = NULL; ! // ! // IEntryIterator it = store.getPOSIndex().rangeIterator(null, null); ! // ! // while (it.hasNext()) { ! // ! // it.next(); ! // ! // stats.stmts1++; ! // ! // SPO stmt = new SPO(KeyOrder.POS, store.keyBuilder, it.getKey()); ! // ! // if (stmt.p != lastP) { ! // ! // lastP = stmt.p; ! // ! // stats.numComputed++; ! // ! // System.err.println("" + stats.numComputed + " : " + lastP ! // + " : " + store.toString(lastP)); ! // ! // buffer.add(new SPO(stmt.p, store.rdfType.id, ! // store.rdfProperty.id)); ! // ! // } ! // ! // } ! // ! // } else { ! /* ! * Alternative implementation does an efficient scan for only the ! * distinct predicates in the KB. ! */ ! // find the distinct predicates in the KB. ! ArrayList<Long> ids = store.distinctTermScan(KeyOrder.POS); ! ! Iterator<Long> itr = ids.iterator(); ! ! while (itr.hasNext()) { ! ! stats.stmts1++; ! ! long p = itr.next(); ! ! buffer.add(new SPO(p, store.rdfType.id, store.rdfProperty.id)); stats.numComputed++; ! } ! ! // } stats.computeTime += System.currentTimeMillis() - computeStart; Index: RuleRdfs02.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/RuleRdfs02.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** RuleRdfs02.java 18 Apr 2007 17:29:07 -0000 1.2 --- RuleRdfs02.java 19 Apr 2007 19:13:45 -0000 1.3 *************** *** 44,49 **** package com.bigdata.rdf.inf; ! ! public class RuleRdfs02 extends AbstractRuleRdfs_2_3_7_9 { --- 44,57 ---- package com.bigdata.rdf.inf; ! /** ! * rdfs2: ! * <pre> ! * triple( u rdf:type x) :- ! * triple( a rdfs:domain x), ! * triple( u a y ). ! * </pre> ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ public class RuleRdfs02 extends AbstractRuleRdfs_2_3_7_9 { Index: RuleRdfs09.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/RuleRdfs09.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** RuleRdfs09.java 18 Apr 2007 17:29:07 -0000 1.5 --- RuleRdfs09.java 19 Apr 2007 19:13:45 -0000 1.6 *************** *** 53,59 **** * triple(?v,rdf:type,?u). * </pre> - * <pre> - * <u rdfs:subClassOf x> AND <v rdf:type u> IMPLIES <v rdf:type x> - * </pre> */ public class RuleRdfs09 extends AbstractRuleRdfs_2_3_7_9 { --- 53,56 ---- Index: Rule.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/Rule.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** Rule.java 18 Apr 2007 17:29:07 -0000 1.7 --- Rule.java 19 Apr 2007 19:13:45 -0000 1.8 *************** *** 178,181 **** --- 178,188 ---- long computeTime; + public String toString() { + + return ", #stmts1=" + stmts1 + ", #stmts2=" + stmts2 + + ", #subqueries=" + numSubqueries + ", numConsidered+" + + numConsidered + ", computeTime=" + computeTime; + } + } Index: AbstractRuleRdf.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/AbstractRuleRdf.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** AbstractRuleRdf.java 14 Apr 2007 13:33:51 -0000 1.8 --- AbstractRuleRdf.java 19 Apr 2007 19:13:45 -0000 1.9 *************** *** 44,51 **** package com.bigdata.rdf.inf; - import org.openrdf.model.URI; - - import com.bigdata.btree.IIndex; - public abstract class AbstractRuleRdf extends Rule { --- 44,47 ---- Index: RuleRdfs03.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/RuleRdfs03.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** RuleRdfs03.java 18 Apr 2007 17:29:07 -0000 1.2 --- RuleRdfs03.java 19 Apr 2007 19:13:45 -0000 1.3 *************** *** 44,49 **** package com.bigdata.rdf.inf; ! ! public class RuleRdfs03 extends AbstractRuleRdfs_2_3_7_9 { --- 44,59 ---- package com.bigdata.rdf.inf; ! /** ! * rdfs3: ! * ! * <pre> ! * triple(v rdf:type x) :- ! * triple(a rdfs:range x), ! * triple(u a v). ! * </pre> ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ public class RuleRdfs03 extends AbstractRuleRdfs_2_3_7_9 { |
From: Bryan T. <tho...@us...> - 2007-04-19 19:13:50
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv3033/src/java/com/bigdata/rdf Modified Files: KeyOrder.java TripleStore.java Log Message: Optimization for Rule rdf1. Index: KeyOrder.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/KeyOrder.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** KeyOrder.java 12 Apr 2007 23:59:21 -0000 1.2 --- KeyOrder.java 19 Apr 2007 19:13:46 -0000 1.3 *************** *** 56,59 **** --- 56,65 ---- public enum KeyOrder { + /** + * @todo make the case of the index name and the case of the enums the same + * (either both upper or both lower). Enums naturally convert to + * strings based on their case, so SPO.toString() is "SPO" but + * SPO.name is "spo", which is confusing. + */ SPO("spo",0), POS("pos",1), Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.32 retrieving revision 1.33 diff -C2 -d -r1.32 -r1.33 *** TripleStore.java 19 Apr 2007 13:22:31 -0000 1.32 --- TripleStore.java 19 Apr 2007 19:13:46 -0000 1.33 *************** *** 54,57 **** --- 54,58 ---- import java.io.InputStreamReader; import java.io.Reader; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; *************** *** 282,288 **** final String name_idTerm = "idTerm"; ! final String name_spo = "spo"; ! final String name_pos = "pos"; ! final String name_osp = "osp"; /* --- 283,289 ---- final String name_idTerm = "idTerm"; ! final String name_spo = KeyOrder.SPO.name; ! final String name_pos = KeyOrder.POS.name; ! final String name_osp = KeyOrder.OSP.name; /* *************** *** 366,369 **** --- 367,392 ---- } + /** + * Return the statement index identified by the {@link KeyOrder}. + * + * @param keyOrder The key order. + * + * @return The statement index for that access path. + */ + public IIndex getStatementIndex(KeyOrder keyOrder) { + + switch (keyOrder) { + case SPO: + return getSPOIndex(); + case POS: + return getPOSIndex(); + case OSP: + return getOSPIndex(); + default: + throw new IllegalArgumentException("Unknown: " + keyOrder); + } + + } + public IIndex getSPOIndex() { *************** *** 621,635 **** public String toString( long s, long p, long o ) { IIndex ndx = getIdTermIndex(); ! Resource s1 = (Resource) ndx.lookup(keyBuilder.id2key(s)); ! ! URI p1 = (URI) ndx.lookup(keyBuilder.id2key(p)); ! ! Value o1 = (Value) ndx.lookup(keyBuilder.id2key(o)); ! ! return ("< " + (s1 instanceof URI ? abbrev((URI) s1) : s1) + ", " ! + abbrev(p1) + ", " ! + (o1 instanceof URI ? abbrev((URI) o1) : o1) + " >"); } --- 644,666 ---- public String toString( long s, long p, long o ) { + return ("< " + toString(s) + ", " + toString(p) + ", " + toString(o) +" >"); + + } + + /** + * Externalizes a term using an abbreviated syntax. + * + * @param termId + * The term identifier. + * + * @return A representation of the term. + */ + public String toString( long termId ) { + IIndex ndx = getIdTermIndex(); ! Value v = (Value) ndx.lookup(keyBuilder.id2key(termId)); ! ! return (v instanceof URI ? abbrev((URI) v) : v.toString()); } *************** *** 1860,1862 **** --- 1891,1968 ---- } + /** + * Performs an efficient scan of a statement index returning the distinct + * term identifiers found in the first key component for the named access + * path. Depending on which access path you are using, this will be the term + * identifiers for the distinct subjects, predicates, or values in the KB. + * + * @param keyOrder + * Names the access path. Use {@link KeyOrder#SPO} to get the + * term identifiers for the distinct subjects, + * {@link KeyOrder#POS} to get the term identifiers for the + * distinct predicates, and {@link KeyOrder#OSP} to get the term + * identifiers for the distinct objects + * + * @return The distinct term identifiers in the first key slot for the + * triples in that index. + */ + public ArrayList<Long> distinctTermScan(KeyOrder keyOrder) { + + /* + * The implementation uses a key scan to find the first term identifer + * for the given index. It then forms a fromKey that starts at the next + * possible term identifier and does another scan, thereby obtaining the + * 2nd distinct term identifier for that position on that index. This + * process is repeated iteratively until the key scan no longer + * identifies a match. This approach skips quickly over regions of the + * index which have many statements for the same term and makes N+1 + * queries to identify N distinct terms. Note that there is no way to + * pre-compute the #of distinct terms that will be identified short of + * running the queries. + */ + ArrayList<Long> ids = new ArrayList<Long>(1000); + + byte[] fromKey = null; + + final byte[] toKey = null; + + IIndex ndx = getStatementIndex(keyOrder); + + IEntryIterator itr = ndx.rangeIterator(fromKey, toKey); + + long[] tmp = new long[3]; + + while(itr.hasNext()) { + + itr.next(); + + // extract the term ids from the key. + keyBuilder.key2Statement(itr.getKey(), tmp); + + final long id = tmp[0]; + + // append tmp[0] to the output list. + ids.add(id); + + // System.err.println(ids.size() + " : " + id + " : " + // + toString(id)); + + // restart scan at the next possible term id. + + final long nextId = id + 1; + + fromKey = keyBuilder.statement2Key(nextId, NULL, NULL); + + // new iterator. + itr = ndx.rangeIterator(fromKey, toKey); + + } + + // System.err.println("Distinct key scan: KeyOrder=" + keyOrder + // + ", #terms=" + ids.size()); + + return ids; + + } + } |
From: Bryan T. <tho...@us...> - 2007-04-19 13:22:34
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv797/src/java/com/bigdata/rdf Modified Files: TripleStore.java Log Message: javadoc edits. Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.31 retrieving revision 1.32 diff -C2 -d -r1.31 -r1.32 *** TripleStore.java 18 Apr 2007 17:29:08 -0000 1.31 --- TripleStore.java 19 Apr 2007 13:22:31 -0000 1.32 *************** *** 105,110 **** * A triple store based on the <em>bigdata</em> architecture. * - * @todo verify that re-loading the same data does not cause index writes. - * * @todo Refactor to support transactions and concurrent load/query and test * same. --- 105,108 ---- *************** *** 153,170 **** * where appropriate, so we need to assign identifiers to bnodes in a * restart-safe manner even if we "forget" the term-id mapping. ! * * @todo modify the term identifier assignment mechanism to be compatible with * the scale-out index partitions (32-bit unique within index partition ! * identified plus a restart-safe counter for each index partition). * * @todo Refactor to use a delegation mechanism so that we can run with or * without partitioned indices? (All you have to do now is change the * class that is being extended from Journal to MasterJournal and handle ! * some different initialization properties.) * * @todo the only added cost for a quad store is the additional statement * indices. There are only three more statement indices in a quad store. * Since statement indices are so cheap, it is probably worth implementing ! * them now, even if only as a configuration option. * * @todo verify read after commit (restart safe) for large data sets and test --- 151,171 ---- * where appropriate, so we need to assign identifiers to bnodes in a * restart-safe manner even if we "forget" the term-id mapping. ! * * @todo modify the term identifier assignment mechanism to be compatible with * the scale-out index partitions (32-bit unique within index partition ! * identified plus a restart-safe counter for each index partition). * * @todo Refactor to use a delegation mechanism so that we can run with or * without partitioned indices? (All you have to do now is change the * class that is being extended from Journal to MasterJournal and handle ! * some different initialization properties.) In fact, the "triple store" ! * should be a client that uses partitioned indices to talk to metadata ! * and data services. * * @todo the only added cost for a quad store is the additional statement * indices. There are only three more statement indices in a quad store. * Since statement indices are so cheap, it is probably worth implementing ! * them now, even if only as a configuration option. (There may be reasons ! * to maintain both versions.) * * @todo verify read after commit (restart safe) for large data sets and test *************** *** 184,193 **** * * @todo support metadata about the statement, e.g., whether or not it is an ! * inference. ! * ! * @todo compute the MB/sec rate at which the store can load data and compare it ! * with the maximum transfer rate for the journal without the btree and ! * the maximum transfer rate to disk. this will tell us the overhead of ! * the btree implementation. * * @todo Try a variant in which we have metadata linking statements and terms --- 185,192 ---- * * @todo support metadata about the statement, e.g., whether or not it is an ! * inference. consider that we may need to move the triple/quad ids into ! * the value in the statement indices since some key compression schemes ! * are not reversable (we depend on reversable keys to extract the term ! * ids for a statement). * * @todo Try a variant in which we have metadata linking statements and terms *************** *** 217,220 **** --- 216,240 ---- * for more thought. * + * @todo examine role for semi joins for a Sesame 2.x integration (quad store + * with real query operators). semi-joins (join indices) can be declared + * for various predicate combinations and then maintained. The + * declarations can be part of the scale-out index metadata. The logic + * that handles batch data load can also maintain the join indices. While + * triggers could be used for this purpose, there would need to be a means + * to aggregate and order the triggered events and then redistribute them + * against the partitions of the join indices. If the logic is in the + * client, then we need to make sure that newly declared join indices are + * fully populated (e.g., clients are notified to start building the join + * index and then we start the index build from existing data to remove + * any chance that the join index would be incomplete - the index would be + * ready as soon as the index build completes and client operations would + * be in a maintenance role). + * + * @todo provide option for closing aspects of the entire store vs just a single + * context in a quad store. For example, in an open web and internet scale + * kb it is unlikely that you would want to have all harvested ontologies + * closed against all the data. however, that might make more sense in a + * more controlled setting. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ |
From: Bryan T. <tho...@us...> - 2007-04-19 12:28:02
|
Update of /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/inf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv14067/src/test/com/bigdata/rdf/inf Modified Files: TestAll.java Added Files: TestRuleRdf01.java Log Message: Added a test case for rdf1, but I have not written the tests yet. --- NEW FILE: TestRuleRdf01.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 18, 2007 */ package com.bigdata.rdf.inf; /** * Test suite for {@link RuleRdf01}. * * <pre> * triple(?v rdf:type rdf:Property) :- * triple( ?u ?v ?x ). * </pre> * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestRuleRdf01 extends AbstractRuleTestCase { /** * */ public TestRuleRdf01() { } /** * @param name */ public TestRuleRdf01(String name) { super(name); } /** * FIXME write test of basic rule semantics and then write another test that * can be used to verify that we are doing an efficient scan for the * distinct predicates (key prefix scan). */ public void test_rdf01() { fail("write test"); } } Index: TestAll.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/inf/TestAll.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestAll.java 18 Apr 2007 17:29:06 -0000 1.2 --- TestAll.java 19 Apr 2007 12:27:37 -0000 1.3 *************** *** 78,81 **** --- 78,83 ---- TestSuite suite = new TestSuite("RDFS inference"); + suite.addTestSuite( TestRuleRdf01.class ); + suite.addTestSuite( TestRuleRdfs07.class ); |