From: Bryan T. <tho...@us...> - 2007-04-25 14:57:20
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16408/src/java/com/bigdata/service Modified Files: AbstractServer.java DataServer.java ClientIndexView.java IMetadataService.java JiniUtil.java IDataService.java MetadataService.java DataService.java MetadataServer.java BigdataClient.java Added Files: DataServiceMap.java Removed Files: EmbeddedDataService.java DataServiceClient.java Log Message: Progress on the BigdataClient. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** DataServer.java 23 Apr 2007 13:09:29 -0000 1.7 --- DataServer.java 25 Apr 2007 14:57:10 -0000 1.8 *************** *** 51,54 **** --- 51,55 ---- import java.rmi.RemoteException; import java.util.Properties; + import java.util.UUID; import net.jini.config.Configuration; *************** *** 160,163 **** --- 161,165 ---- protected AbstractServer server; + private UUID serviceUUID; public AdministrableDataService(AbstractServer server,Properties properties) { *************** *** 205,208 **** --- 207,222 ---- } + protected UUID getDataServiceUUID() { + + if(serviceUUID==null) { + + serviceUUID = JiniUtil.serviceID2UUID(server.getServiceID()); + + } + + return serviceUUID; + + } + // /* // * JoinAdmin --- NEW FILE: DataServiceMap.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 25, 2007 */ package com.bigdata.service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import net.jini.core.lookup.ServiceID; import net.jini.core.lookup.ServiceItem; import net.jini.lookup.LookupCache; import net.jini.lookup.ServiceDiscoveryEvent; import net.jini.lookup.ServiceDiscoveryListener; import net.jini.lookup.ServiceDiscoveryManager; /** * A mapping from {@link ServiceID} to {@link ServiceItem} that is maintained by * a suitable {@link ServiceDiscoveryManager} manager. In order to use this * class, you must register it as the {@link ServiceDiscoveryListener} with the * {@link ServiceDiscoveryManager}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class DataServiceMap implements ServiceDiscoveryListener { public static final transient Logger log = Logger .getLogger(DataServiceMap.class); private Map<ServiceID, ServiceItem> serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItem>(); public DataServiceMap() { } /* * ServiceDiscoveryListener. */ /** * Adds the {@link ServiceItem} to the internal map to support * {@link #getServiceByID()} * <p> * Note: This event is generated by the {@link LookupCache}. There is an * event for each {@link DataService} as it joins any registrar in the set * of registrars to which the {@link MetadataServer} is listening. The set * of distinct joined {@link DataService}s is accessible via the * {@link LookupCache}. */ public void serviceAdded(ServiceDiscoveryEvent e) { log.info("" + e + ", class=" + e.getPostEventServiceItem().toString()); serviceIdMap.put(e.getPostEventServiceItem().serviceID, e .getPostEventServiceItem()); } /** * NOP. */ public void serviceChanged(ServiceDiscoveryEvent e) { log.info(""+e+", class=" + e.getPostEventServiceItem().toString()); serviceIdMap.put(e.getPostEventServiceItem().serviceID, e .getPostEventServiceItem()); } /** * NOP. */ public void serviceRemoved(ServiceDiscoveryEvent e) { log.info(""+e+", class=" + e.getPreEventServiceItem().toString()); serviceIdMap.remove(e.getPreEventServiceItem().serviceID); } /* * Our own API. */ /** * Resolve the {@link ServiceID} for a {@link DataService} to the cached * {@link ServiceItem} for that {@link DataService}. * * @param serviceID * The {@link ServiceID} for the {@link DataService}. * * @return The cache {@link ServiceItem} for that {@link DataService}. */ public ServiceItem getDataServiceByID(ServiceID serviceID) { return serviceIdMap.get(serviceID); } /** * Return the #of {@link DataService}s known to this {@link MetadataServer}. * * @return The #of {@link DataService}s in the {@link LookupCache}. */ public int getDataServiceCount() { return serviceIdMap.size(); } } Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.11 retrieving revision 1.12 diff -C2 -d -r1.11 -r1.12 *** IDataService.java 23 Apr 2007 17:22:14 -0000 1.11 --- IDataService.java 25 Apr 2007 14:57:11 -0000 1.12 *************** *** 147,150 **** --- 147,164 ---- /** + * Return the unique index identifier for the named index (synchronous, + * unisolated). + * + * @param name + * The index name. + * + * @return The index UUID -or- <code>null</code> if the index is not + * registered on this {@link IDataService}. + * + * @throws IOException + */ + public UUID getIndexUUID(String name) throws IOException; + + /** * Drops the named index (unisolated). * *************** *** 229,235 **** * @todo support extension operations (read or mutable). */ ! public void batchInsert(long tx, String name, int ntuples, byte[][] keys, ! byte[][] values) throws InterruptedException, ExecutionException, ! IOException; public boolean[] batchContains(long tx, String name, int ntuples, --- 243,249 ---- * @todo support extension operations (read or mutable). */ ! public byte[][] batchInsert(long tx, String name, int ntuples, ! byte[][] keys, byte[][] values, boolean returnOldValues) ! throws InterruptedException, ExecutionException, IOException; public boolean[] batchContains(long tx, String name, int ntuples, Index: MetadataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataServer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** MetadataServer.java 23 Apr 2007 23:44:42 -0000 1.6 --- MetadataServer.java 25 Apr 2007 14:57:11 -0000 1.7 *************** *** 51,57 **** import java.rmi.Remote; import java.rmi.RemoteException; - import java.util.Map; import java.util.Properties; ! import java.util.concurrent.ConcurrentHashMap; import net.jini.core.lookup.ServiceID; --- 51,56 ---- import java.rmi.Remote; import java.rmi.RemoteException; import java.util.Properties; ! import java.util.UUID; import net.jini.core.lookup.ServiceID; *************** *** 60,65 **** import net.jini.lease.LeaseRenewalManager; import net.jini.lookup.LookupCache; - import net.jini.lookup.ServiceDiscoveryEvent; - import net.jini.lookup.ServiceDiscoveryListener; import net.jini.lookup.ServiceDiscoveryManager; import net.jini.lookup.ServiceItemFilter; --- 59,62 ---- *************** *** 91,99 **** * @version $Id$ */ ! public class MetadataServer extends DataServer implements ServiceDiscoveryListener { private ServiceDiscoveryManager serviceDiscoveryManager = null; private LookupCache dataServiceLookupCache = null; ! private Map<ServiceID, ServiceItem> serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItem>(); protected LookupCache getDataServiceLookupCache() { --- 88,101 ---- * @version $Id$ */ ! public class MetadataServer extends DataServer { private ServiceDiscoveryManager serviceDiscoveryManager = null; private LookupCache dataServiceLookupCache = null; ! ! /** ! * Provides direct cached lookup of {@link DataService}s by their ! * {@link ServiceID}. ! */ ! public DataServiceMap dataServiceMap = new DataServiceMap(); protected LookupCache getDataServiceLookupCache() { *************** *** 139,143 **** dataServiceLookupCache = serviceDiscoveryManager .createLookupCache(template, ! new DataServiceFilter() /* filter */, this /* ServiceDiscoveryListener */); } catch(RemoteException ex) { --- 141,145 ---- dataServiceLookupCache = serviceDiscoveryManager .createLookupCache(template, ! new DataServiceFilter() /* filter */, dataServiceMap/* ServiceDiscoveryListener */); } catch(RemoteException ex) { *************** *** 210,288 **** } - - /* - * ServiceDiscoveryListener. - */ - - /** - * Adds the {@link ServiceItem} to the internal map to support - * {@link #getServiceByID()} - * <p> - * Note: This event is generated by the {@link LookupCache}. There is an - * event for each {@link DataService} as it joins any registrar in the set - * of registrars to which the {@link MetadataServer} is listening. The set - * of distinct joined {@link DataService}s is accessible via the - * {@link LookupCache}. - */ - public void serviceAdded(ServiceDiscoveryEvent e) { - - log.info("" + e + ", class=" - + e.getPostEventServiceItem().toString()); - - serviceIdMap.put(e.getPostEventServiceItem().serviceID, e - .getPostEventServiceItem()); - - } - - /** - * NOP. - */ - public void serviceChanged(ServiceDiscoveryEvent e) { - - log.info(""+e+", class=" - + e.getPostEventServiceItem().toString()); - - serviceIdMap.put(e.getPostEventServiceItem().serviceID, e - .getPostEventServiceItem()); - - } - - /** - * NOP. - */ - public void serviceRemoved(ServiceDiscoveryEvent e) { - - log.info(""+e+", class=" - + e.getPreEventServiceItem().toString()); - - serviceIdMap.remove(e.getPreEventServiceItem().serviceID); - - } - - /** - * Resolve the {@link ServiceID} for a {@link DataService} to the cached - * {@link ServiceItem} for that {@link DataService}. - * - * @param serviceID - * The {@link ServiceID} for the {@link DataService}. - * - * @return The cache {@link ServiceItem} for that {@link DataService}. - */ - public ServiceItem getDataServiceByID(ServiceID serviceID) { - - return serviceIdMap.get(serviceID); - - } - - /** - * Return the #of {@link DataService}s known to this {@link MetadataServer}. - * - * @return The #of {@link DataService}s in the {@link LookupCache}. - */ - public int getDataServiceCount() { - - return serviceIdMap.size(); - - } /** --- 212,215 ---- *************** *** 310,314 **** protected MetadataServer server; ! /** * @param properties --- 237,242 ---- protected MetadataServer server; ! private UUID serviceUUID; ! /** * @param properties *************** *** 330,333 **** --- 258,273 ---- } + protected UUID getDataServiceUUID() { + + if (serviceUUID == null) { + + serviceUUID = JiniUtil.serviceID2UUID(server.getServiceID()); + + } + + return serviceUUID; + + } + /** * Return the UUID of an under utilized data service. *************** *** 348,352 **** public IDataService getDataServiceByID(ServiceID serviceID) throws IOException { ! return (IDataService)server.getDataServiceByID(serviceID).service; } --- 288,292 ---- public IDataService getDataServiceByID(ServiceID serviceID) throws IOException { ! return (IDataService)server.dataServiceMap.getDataServiceByID(serviceID).service; } --- DataServiceClient.java DELETED --- Index: ClientIndexView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/ClientIndexView.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** ClientIndexView.java 23 Apr 2007 23:44:42 -0000 1.2 --- ClientIndexView.java 25 Apr 2007 14:57:10 -0000 1.3 *************** *** 48,51 **** --- 48,52 ---- package com.bigdata.service; + import java.io.IOException; import java.util.UUID; *************** *** 56,60 **** --- 57,64 ---- import com.bigdata.btree.IEntryIterator; import com.bigdata.btree.IIndex; + import com.bigdata.scaleup.IPartitionMetadata; import com.bigdata.scaleup.PartitionedIndexView; + import com.bigdata.service.BigdataClient.BigdataFederation; + import com.bigdata.service.BigdataClient.IBigdataFederation; /** *************** *** 81,84 **** --- 85,96 ---- * and can be refactored for this purpose. * + * @todo offer alternatives for the batch insert and remove methods that do not + * return the old values to the client so that the client may opt to + * minimize network traffic for data that it does not need. + * + * @todo develop and offer policies for handling index partitions that are + * unavailable at the time of the request (continued operation during + * partial failure). + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ *************** *** 86,122 **** public class ClientIndexView implements IIndex { ! protected final BigdataClient client; ! protected final String name; ! protected IMetadataService mdproxy; ! protected UUID indexUUID; // @todo final. ! public ClientIndexView(BigdataClient client, String name) { ! if(client==null) throw new IllegalArgumentException(); if(name==null) throw new IllegalArgumentException(); ! this.client = client; ! this.name = name; ! ! /* ! * obtain the proxy for a metadata service. if this instance fails, then ! * we can always ask for a new instance for the same distributed ! * database (failover). ! */ ! // mdproxy = client.getMetadataService(); ! ! /* ! * obtain the indexUUID. this is a means of verifying that the index ! * exists. ! */ ! // indexUUID = mdproxy.getIndexUUID(name); } public UUID getIndexUUID() { return indexUUID; --- 98,172 ---- public class ClientIndexView implements IIndex { ! private final BigdataFederation fed; ! private final long tx; ! private final String name; ! ! /** ! * The unique index identifier (initally null and cached once fetched). ! */ ! private UUID indexUUID; ! /** ! * Obtain the proxy for a metadata service. if this instance fails, then we ! * can always ask for a new instance for the same federation (failover). ! */ ! protected IMetadataService getMetadataService() { ! ! return fed.getMetadataService(); ! ! } ! /** ! * Create a view on a scale-out index. ! * ! * @param fed ! * The federation containing the index. ! * @param tx ! * The transaction identifier or zero(0L) iff the index view is ! * not isolated by a transaction. ! * @param name ! * The index name. ! */ ! public ClientIndexView(BigdataFederation fed, long tx, String name) { ! if(fed ==null) throw new IllegalArgumentException(); if(name==null) throw new IllegalArgumentException(); + + if(tx != IBigdataFederation.UNISOLATED) { + + throw new UnsupportedOperationException( + "Only unisolated views are supported at this time"); + + } ! this.fed = fed; ! this.tx = tx; + this.name = name; + } public UUID getIndexUUID() { + if(indexUUID==null) { + + /* + * obtain the UUID for the managed scale-out index. + */ + + try { + + indexUUID = getMetadataService().getManagedIndexUUID(name); + + } catch(IOException ex) { + + throw new RuntimeException(ex); + + } + + } + return indexUUID; *************** *** 124,149 **** public boolean contains(byte[] key) { ! // TODO Auto-generated method stub ! return false; } public Object insert(Object key, Object value) { ! // TODO Auto-generated method stub ! return null; } public Object lookup(Object key) { ! // TODO Auto-generated method stub ! return null; } public Object remove(Object key) { ! // TODO Auto-generated method stub ! return null; } public int rangeCount(byte[] fromKey, byte[] toKey) { ! // TODO Auto-generated method stub ! return 0; } --- 174,317 ---- public boolean contains(byte[] key) { ! ! IPartitionMetadata pmd = fed.getPartition(name, key); ! ! IDataService dataService = fed.getDataService(pmd); ! ! final boolean[] ret; ! ! try { ! ! ret = dataService.batchContains(tx, name, 1, new byte[][]{key}); ! ! } catch(Exception ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! return ret[0]; ! } public Object insert(Object key, Object value) { ! ! IPartitionMetadata pmd = fed.getPartition(name, (byte[])key); ! ! IDataService dataService = fed.getDataService(pmd); ! ! final boolean returnOldValues = true; ! ! final byte[][] ret; ! ! try { ! ! ret = dataService.batchInsert(tx, name, 1, ! new byte[][] { (byte[]) key }, ! new byte[][] { (byte[]) value }, returnOldValues); ! ! } catch (Exception ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! return ret[0]; ! } public Object lookup(Object key) { ! ! IPartitionMetadata pmd = fed.getPartition(name, (byte[])key); ! ! IDataService dataService = fed.getDataService(pmd); ! ! final byte[][] ret; ! ! try { ! ! ret = dataService.batchLookup(tx, name, 1, ! new byte[][] { (byte[]) key }); ! ! } catch (Exception ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! return ret[0]; ! } public Object remove(Object key) { ! ! IPartitionMetadata pmd = fed.getPartition(name, (byte[])key); ! ! IDataService dataService = fed.getDataService(pmd); ! ! final byte[][] ret; ! ! final boolean returnOldValues = true; ! ! try { ! ! ret = dataService.batchRemove(tx, name, 1, ! new byte[][] { (byte[]) key }, returnOldValues ); ! ! } catch (Exception ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! return ret[0]; ! } + /** + * Returns the sum of the range count for each index partition spanned by + * the key range. + * + * FIXME support range count that spans more than one partition. + * + * FIXME Handle fromKey == null, toKey == null. + * + * @todo note that it is possible (though unlikely) for an index partition + * split or join to occur during this operation. Figure out how I want + * to handle that, and how I want to handle that with transactional + * isolation (presumably a read-only historical view of the metadata + * index would be used - in which case we need to pass the tx into + * the getPartition() method). + */ public int rangeCount(byte[] fromKey, byte[] toKey) { ! ! IPartitionMetadata pmd1 = fed.getPartition(name, (byte[])fromKey); ! ! IPartitionMetadata pmd2 = fed.getPartition(name, (byte[])toKey); ! ! if(pmd2.getPartitionId()!=pmd1.getPartitionId()) { ! ! throw new UnsupportedOperationException( ! "Can not span partitions at this time"); ! ! } ! ! IDataService dataService = fed.getDataService(pmd1); ! ! int rangeCount = 0; ! ! try { ! ! rangeCount += dataService.rangeCount(IDataService.UNISOLATED, name, ! fromKey, toKey); ! ! } catch(Exception ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! return rangeCount; ! } Index: MetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataService.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** MetadataService.java 23 Apr 2007 23:44:42 -0000 1.8 --- MetadataService.java 25 Apr 2007 14:57:11 -0000 1.9 *************** *** 55,58 **** --- 55,59 ---- import net.jini.core.lookup.ServiceID; + import com.bigdata.scaleup.IPartitionMetadata; import com.bigdata.scaleup.MasterJournal; import com.bigdata.scaleup.MetadataIndex; *************** *** 96,99 **** --- 97,115 ---- IMetadataService, IServiceShutdown { + /** + * Return the name of the metadata index. + * + * @param indexName + * The name of the scale-out index. + * + * @return The name of the corresponding {@link MetadataIndex} that is used + * to manage the partitions in the named scale-out index. + */ + public static String getMetadataName(String indexName) { + + return "metadata-"+indexName; + + } + protected MetadataService(Properties properties) { *************** *** 120,139 **** } ! ! /** ! * Return the name of the metadata index. ! * ! * @param indexName ! * The name of the scale-out index. ! * ! * @return The name of the corresponding {@link MetadataIndex} that is used ! * to manage the partitions in the named scale-out index. ! */ ! public static String getMetadataName(String indexName) { ! return "metadata-"+indexName; } /** * Registers a metadata index for a named scale-out index and creates the --- 136,192 ---- } ! ! public UUID getManagedIndexUUID(String name) throws IOException { ! // the name of the metadata index itself. ! final String metadataName = getMetadataName(name); ! ! // make sure there is no metadata index for that btree. ! MetadataIndex mdi = (MetadataIndex) journal.getIndex(metadataName); ! ! if(mdi == null) { ! ! return null; ! ! } ! ! return mdi.getManagedIndexUUID(); ! ! } ! ! public IPartitionMetadata getPartition(String name,byte[] key) throws IOException { ! ! // the name of the metadata index itself. ! final String metadataName = getMetadataName(name); ! ! // make sure there is no metadata index for that btree. ! MetadataIndex mdi = (MetadataIndex) journal.getIndex(metadataName); ! ! if(mdi == null) { ! ! throw new IllegalArgumentException("Index not registered: " + name); ! ! } ! ! /* ! * @todo this winds up deserializing the value into a PartitionMetadata ! * object and then re-serializing it to return to the remote client. ! */ ! IPartitionMetadata pmd = mdi.find(key); ! ! if( pmd == null ) { ! ! throw new IllegalStateException("No partitioned in index: "+name); ! ! } ! ! return pmd; } + /* + * Tasks. + */ + /** * Registers a metadata index for a named scale-out index and creates the --- EmbeddedDataService.java DELETED --- Index: BigdataClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/BigdataClient.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** BigdataClient.java 23 Apr 2007 23:44:42 -0000 1.1 --- BigdataClient.java 25 Apr 2007 14:57:11 -0000 1.2 *************** *** 50,53 **** --- 50,55 ---- import java.io.IOException; import java.rmi.RemoteException; + import java.util.UUID; + import java.util.concurrent.ExecutionException; import net.jini.config.Configuration; *************** *** 64,72 **** --- 66,77 ---- import net.jini.lookup.LookupCache; import net.jini.lookup.ServiceDiscoveryManager; + import net.jini.lookup.ServiceItemFilter; import org.apache.log4j.Logger; import com.bigdata.btree.IIndex; + import com.bigdata.io.SerializerUtil; import com.bigdata.journal.ITransactionManager; + import com.bigdata.journal.CommitRecordIndex.Entry; import com.bigdata.scaleup.IPartitionMetadata; *************** *** 101,104 **** --- 106,113 ---- * transaction identifier for its read and write operations. * + * @todo support transactions (there is no transaction manager service yet and + * the 2-/3-phase commit protocol has not been implemented on the + * journal). + * * @todo Write or refactor logic to map operations across multiple partitions. * *************** *** 151,154 **** --- 160,175 ---- private ServiceDiscoveryManager serviceDiscoveryManager = null; private LookupCache metadataServiceLookupCache = null; + private LookupCache dataServiceLookupCache = null; + private DataServiceMap dataServiceMap = new DataServiceMap(); + + private final ServiceTemplate metadataServiceTemplate = new ServiceTemplate( + null, new Class[] { IMetadataService.class }, null); + + private final ServiceTemplate dataServiceTemplate = new ServiceTemplate( + null, new Class[] { IDataService.class }, null); + + private ServiceItemFilter metadataServiceFilter = null; + + private ServiceItemFilter dataServiceFilter = new MetadataServer.DataServiceFilter(); /** *************** *** 170,198 **** * @todo parameter for the bigdata federation? (filter) */ ! public IMetadataService getMetadataService() throws InterruptedException { ! ! ServiceItem item = null; ! ! for(int i=0; i<10; i++) { ! item = metadataServiceLookupCache.lookup(null); ! if (item != null) { ! // found one. ! break; } - - Thread.sleep(200); } ! if(item==null) { ! ! log.warn("No metadata service in cache"); ! return null; } --- 191,299 ---- * @todo parameter for the bigdata federation? (filter) */ ! public IMetadataService getMetadataService() { ! ServiceItem item = metadataServiceLookupCache.lookup(null); ! if (item == null) { ! /* ! * cache miss. do a remote query on the managed set of service ! * registrars. ! */ ! ! log.info("Cache miss."); ! ! final long timeout = 1000L; // millis. ! ! try { ! ! item = serviceDiscoveryManager.lookup(metadataServiceTemplate, ! metadataServiceFilter, timeout); ! ! } catch (RemoteException ex) { ! log.error(ex); ! ! return null; ! ! } catch(InterruptedException ex) { ! ! log.info("Interrupted - no match."); ! ! return null; ! ! } ! ! if( item == null ) { ! ! // Could not discover a metadata service. ! ! log.warn("Could not discover metadata service"); ! ! return null; } } ! log.info("Found: "+item); ! ! return (IMetadataService)item.service; ! ! } ! ! /** ! * Resolve the {@link ServiceID} to an {@link IDataService} using a local ! * cache. ! * ! * @param serviceID ! * The identifier for a {@link DataService}. ! * ! * @return The proxy for that {@link DataService} or <code>null</code> iff ! * the {@link DataService} could not be discovered. ! */ ! public IDataService getDataService(ServiceID serviceID) { ! ! ServiceItem item = dataServiceMap.getDataServiceByID(serviceID); ! ! if (item == null) { ! ! /* ! * cache miss. do a remote query on the managed set of service ! * registrars. ! */ ! ! log.info("Cache miss."); ! final long timeout = 1000L; // millis. ! ! try { ! ! item = serviceDiscoveryManager.lookup(dataServiceTemplate, ! dataServiceFilter, timeout); ! ! } catch (RemoteException ex) { ! ! log.error(ex); ! ! return null; ! ! } catch(InterruptedException ex) { ! ! log.info("Interrupted - no match."); ! ! return null; ! ! } ! ! if( item == null ) { ! ! // Could not discover a data service. ! ! log.warn("Could not discover data service"); ! ! return null; ! ! } } *************** *** 200,204 **** log.info("Found: "+item); ! return (IMetadataService)item.service; } --- 301,305 ---- log.info("Found: "+item); ! return (IDataService)item.service; } *************** *** 273,277 **** /* * Setup a helper class that will be notified as services join or leave ! * the various registrars to which the metadata server is listening. */ try { --- 374,378 ---- /* * Setup a helper class that will be notified as services join or leave ! * the various registrars to which the client is listening. */ try { *************** *** 301,313 **** try { ! ServiceTemplate template = new ServiceTemplate(null, ! new Class[] { IMetadataService.class }, null); ! metadataServiceLookupCache = serviceDiscoveryManager ! .createLookupCache(template, null /* filter */, null); } catch(RemoteException ex) { ! log.error("Could not setup lookup metadataServiceLookupCache", ex); terminate(); --- 402,437 ---- try { ! metadataServiceLookupCache = serviceDiscoveryManager.createLookupCache( ! metadataServiceTemplate, ! null /* filter */, null); ! ! } catch(RemoteException ex) { ! ! log.error("Could not setup MetadataService LookupCache", ex); ! terminate(); ! ! System.exit(1); ! ! } ! ! /* ! * Setup a LookupCache that is used to keep track of all data services ! * registered with any service registrar to which the client is ! * listening. ! * ! * @todo provide filtering by attributes to select the primary vs ! * failover metadata servers? by attributes identiying the bigdata ! * federation? ! */ ! try { ! ! dataServiceLookupCache = serviceDiscoveryManager.createLookupCache( ! dataServiceTemplate, ! dataServiceFilter /* filter */, dataServiceMap); } catch(RemoteException ex) { ! log.error("Could not setup DataService LookupCache", ex); terminate(); *************** *** 332,335 **** --- 456,467 ---- } + if (dataServiceLookupCache != null) { + + dataServiceLookupCache.terminate(); + + dataServiceLookupCache = null; + + } + if (serviceDiscoveryManager != null) { *************** *** 350,352 **** --- 482,676 ---- } + /** + * Connect to a bigdata federation. + * + * @return + * + * @todo determine how a federation will be identified, e.g., by a name that + * is an {@link Entry} on the {@link MetadataServer} and + * {@link DataServer} service descriptions and provide that name + * attribute here. Note that a {@link MetadataService} can failover, + * so the {@link ServiceID} for the {@link MetadataService} is not the + * invariant, but rather the name attribute for the federation. + */ + public IBigdataFederation connect() { + + return new BigdataFederation(this); + + } + + /** + * Interface to a bigdata federation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo declare public methods intended for application use. + */ + public static interface IBigdataFederation { + + /** + * A constant that may be used as the transaction identifier when the + * operation is <em>unisolated</em> (non-transactional). The value of + * this constant is ZERO (0L). + */ + public static final long UNISOLATED = 0L; + + /** + * Register a scale-out index with the federation. + * + * @param name + * The index name. + * + * @return The UUID that identifies the scale-out index. + */ + public UUID registerIndex(String name); + + /** + * Obtain a view on a partitioned index. + * + * @param tx + * The transaction identifier or zero(0L) iff the index will + * be unisolated. + * + * @param name + * The index name. + * + * @return The index or <code>null</code> if the index is not + * registered with the {@link MetadataService}. + */ + public IIndex getIndex(long tx, String name); + + } + + /** + * This class encapsulates access to the metadata and data services for a + * bigdata federation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ + public static class BigdataFederation implements IBigdataFederation { + + private final BigdataClient client; + + public IMetadataService getMetadataService() { + + return client.getMetadataService(); + + } + + public BigdataFederation(BigdataClient client) { + + if(client==null) throw new IllegalArgumentException(); + + this.client = client; + + } + + public UUID registerIndex(String name) { + + try { + + return getMetadataService().registerIndex(name); + + } catch(Exception ex) { + + log.error(ex); + + throw new RuntimeException(ex); + + } + + } + + /** + * @todo support isolated views, share cached data service information + * between isolated and unisolated views. + */ + public IIndex getIndex(long tx, String name) { + + // @todo if(tx!=0l) validate exists? + + return new ClientIndexView(this,tx,name); + + } + + /** + * @todo setup cache. test cache and lookup on metadata service if a + * cache miss. + * + * @param name + * @param key + * @return + */ + public IPartitionMetadata getPartition(String name, byte[] key) { + + // synchronized(indexCache) { + // + // Map<Integer,IDataService> partitionCache = indexCache.get(name); + // + // if(partitionCache==null) { + // + // partitionCache = new ConcurrentHashMap<Integer, IDataService>(); + // + // indexCache.put(name, partitionCache); + // + // } + // + // IDataService dataService = + // + // } + + /* + * Request the index partition metadata for the initial partition of the + * scale-out index. + */ + + IPartitionMetadata pmd; + + try { + + pmd = getMetadataService().getPartition(name, key); + + } catch(Exception ex) { + + throw new RuntimeException(ex); + + } + + return pmd; + + } + // + // private Map<String, Map<Integer, IDataService>> indexCache = new ConcurrentHashMap<String, Map<Integer, IDataService>>(); + + /** + * Resolve the data service to which the index partition was mapped. + * + * @todo use lookup cache in a real client. + */ + public IDataService getDataService(IPartitionMetadata pmd) { + + ServiceID serviceID = JiniUtil + .uuid2ServiceID(pmd.getDataServices()[0]); + + final IDataService dataService; + + try { + + dataService = client.getDataService(serviceID); + + } catch(Exception ex) { + + throw new RuntimeException(ex); + + } + + return dataService; + + } + + } + } Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.13 retrieving revision 1.14 diff -C2 -d -r1.13 -r1.14 *** DataService.java 23 Apr 2007 23:44:42 -0000 1.13 --- DataService.java 25 Apr 2007 14:57:11 -0000 1.14 *************** *** 197,201 **** * gateways is also a common use case. Do we have to handle it specially? */ ! public class DataService implements IDataService, IWritePipeline, IResourceTransfer { --- 197,201 ---- * gateways is also a common use case. Do we have to handle it specially? */ ! abstract public class DataService implements IDataService, IWritePipeline, IResourceTransfer { *************** *** 348,351 **** --- 348,359 ---- } + /** + * The unique identifier for this data service - this is used mainly for log + * messages. + * + * @return The unique data service identifier. + */ + protected abstract UUID getDataServiceUUID(); + /* * ITxCommitProtocol. *************** *** 386,395 **** } ! public void registerIndex(String name,UUID indexUUID) throws IOException, InterruptedException, ExecutionException{ ! journal.serialize(new RegisterIndexTask(name,indexUUID)).get(); } ! public void dropIndex(String name) throws IOException, InterruptedException, ExecutionException { --- 394,418 ---- } ! public void registerIndex(String name, UUID indexUUID) throws IOException, ! InterruptedException, ExecutionException { ! ! journal.serialize(new RegisterIndexTask(name, indexUUID)).get(); ! ! } ! ! public UUID getIndexUUID(String name) throws IOException { ! IIndex ndx = journal.getIndex(name); ! ! if(ndx == null) { ! ! return null; ! ! } ! ! return ndx.getIndexUUID(); } ! public void dropIndex(String name) throws IOException, InterruptedException, ExecutionException { *************** *** 407,419 **** } ! public void batchInsert(long tx, String name, int ntuples, byte[][] keys, ! byte[][] vals) throws InterruptedException, ExecutionException { ! ! batchOp( tx, name, new BatchInsert(ntuples, keys, vals)); ! } ! public boolean[] batchContains(long tx, String name, int ntuples, byte[][] keys ! ) throws InterruptedException, ExecutionException { BatchContains op = new BatchContains(ntuples, keys, new boolean[ntuples]); --- 430,447 ---- } ! public byte[][] batchInsert(long tx, String name, int ntuples, ! byte[][] keys, byte[][] vals, boolean returnOldValues) ! throws InterruptedException, ExecutionException { ! ! BatchInsert op = new BatchInsert(ntuples, keys, vals); ! ! batchOp(tx, name, op); ! ! return returnOldValues ? (byte[][]) op.values : null; ! } ! public boolean[] batchContains(long tx, String name, int ntuples, ! byte[][] keys) throws InterruptedException, ExecutionException { BatchContains op = new BatchContains(ntuples, keys, new boolean[ntuples]); *************** *** 706,710 **** journal.commit(); ! log.info("registeredIndex: "+name+", indexUUID="+indexUUID); --- 734,738 ---- journal.commit(); ! // @todo log the dataServiceID with each log message for this class. log.info("registeredIndex: "+name+", indexUUID="+indexUUID); Index: AbstractServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractServer.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** AbstractServer.java 23 Apr 2007 17:22:13 -0000 1.8 --- AbstractServer.java 25 Apr 2007 14:57:10 -0000 1.9 *************** *** 198,202 **** /** ! * Return the assigned {@link ServiceID}. */ public ServiceID getServiceID() { --- 198,204 ---- /** ! * Return the assigned {@link ServiceID}. If this is a new service, then ! * the {@link ServiceID} will be <code>null</code> until it has been ! * assigned by a service registrar. */ public ServiceID getServiceID() { Index: IMetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IMetadataService.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** IMetadataService.java 23 Apr 2007 17:22:14 -0000 1.6 --- IMetadataService.java 25 Apr 2007 14:57:10 -0000 1.7 *************** *** 55,58 **** --- 55,61 ---- import net.jini.core.lookup.ServiceItem; + import com.bigdata.scaleup.IPartitionMetadata; + import com.bigdata.scaleup.MetadataIndex; + /** * A metadata service for a named index. *************** *** 148,151 **** --- 151,190 ---- public UUID registerIndex(String name) throws IOException, InterruptedException, ExecutionException; + + /** + * Return the unique identifier for the managed index. + * + * @param name + * The name of the managed index. + * + * @return The managed index UUID -or- <code>null</code> if there is no + * managed scale-out index with that name. + * + * @throws IOException + */ + public UUID getManagedIndexUUID(String name) throws IOException; + /** + * Return the metadata for the index partition in which the key would be + * found. + * + * @param name + * The name of the scale-out index. + * @param key + * The key. + * @return The metadata for the index partition in which that key would be + * found. + * + * @throws IOException + * + * FIXME offer a variant that reports the index partitions spanned by a key + * range and write tests for that. Note that the remote API for that method + * should use a result-set data model to efficiently communicate the data + * when there are a large #of spanned partitions. + * + * @see MetadataIndex#find(byte[]) + */ + public IPartitionMetadata getPartition(String name, byte[] key) + throws IOException; + } Index: JiniUtil.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/JiniUtil.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** JiniUtil.java 23 Apr 2007 23:44:42 -0000 1.1 --- JiniUtil.java 25 Apr 2007 14:57:10 -0000 1.2 *************** *** 72,75 **** --- 72,77 ---- public static UUID serviceID2UUID(ServiceID serviceID) { + if(serviceID==null) return null; + return new UUID(serviceID.getMostSignificantBits(), serviceID .getLeastSignificantBits()); *************** *** 88,91 **** --- 90,95 ---- public static ServiceID uuid2ServiceID(UUID uuid) { + if(uuid==null) return null; + return new ServiceID(uuid.getMostSignificantBits(), uuid .getLeastSignificantBits()); |