|
From: Bryan T. <tho...@us...> - 2007-04-23 13:10:20
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv20436/src/java/com/bigdata/service Modified Files: AbstractClient.java AbstractServer.java DataServer.java MapReduceService.java IMetadataService.java MetadataService.java MetadataServer.java Added Files: ClientIndexView.java Log Message: The metadata server now tracks join/leaves of data services within a managed set of registrars and has a test case for this feature. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** DataServer.java 21 Apr 2007 10:37:16 -0000 1.6 --- DataServer.java 23 Apr 2007 13:09:29 -0000 1.7 *************** *** 55,59 **** import com.bigdata.journal.IJournal; - import com.sun.jini.start.LifeCycle; /** --- 55,58 ---- *************** *** 64,70 **** --- 63,79 ---- * passed to the {@link DataServer#DataServer(String[])} constructor or * {@link #main(String[])}. + * <p> * * @see src/resources/config for sample configurations. * + * @todo describe and implement the media replication mechanism and service + * failover. only the primary service is mutable. service instances for + * the same persistent data must determine whether or not another service + * is running (basically, can they obtain a lock to operate as the primary + * service). secondary service instances may provide redundent read-only + * operations from replicated media (or media on a distributed file + * system). the failover mechanisms are essentially the same for the data + * service and for the derived metadata service. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ *************** *** 84,92 **** } ! public DataServer(String[] args, LifeCycle lifeCycle) { ! ! super( args, lifeCycle ); ! ! } /** --- 93,101 ---- } ! // public DataServer(String[] args, LifeCycle lifeCycle) { ! // ! // super( args, lifeCycle ); ! // ! // } /** *************** *** 203,208 **** // // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 212,215 ---- *************** *** 212,217 **** // log.info(""); // - // // TODO Auto-generated method stub - // // } // --- 219,222 ---- *************** *** 219,224 **** // // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 224,227 ---- *************** *** 228,232 **** // log.info(""); // - // // TODO Auto-generated method stub // return null; // } --- 231,234 ---- *************** *** 236,240 **** // log.info(""); // - // // TODO Auto-generated method stub // return null; // } --- 238,241 ---- *************** *** 244,248 **** // log.info(""); // - // // TODO Auto-generated method stub // return null; // } --- 245,248 ---- *************** *** 251,256 **** // // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 251,254 ---- *************** *** 259,270 **** // log.info(""); // - // // TODO Auto-generated method stub - // // } // // public void removeLookupLocators(LookupLocator[] arg0) throws RemoteException { // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 257,264 ---- *************** *** 272,277 **** // public void setLookupGroups(String[] arg0) throws RemoteException { // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 266,269 ---- *************** *** 279,284 **** // public void setLookupLocators(LookupLocator[] arg0) throws RemoteException { // log.info(""); - // - // // TODO Auto-generated method stub // // } --- 271,274 ---- Index: AbstractServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractServer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** AbstractServer.java 20 Apr 2007 16:36:27 -0000 1.6 --- AbstractServer.java 23 Apr 2007 13:09:29 -0000 1.7 *************** *** 95,99 **** * * <pre> ! * java -Djava.security.policy=policy.all -cp lib\jini-ext.jar;lib\start.jar com.sun.jini.start.ServiceStarter src/test/com/bigdata/service/TestServerStarter.config * </pre> * --- 95,99 ---- * * <pre> ! * java -Djava.security.policy=policy.all -cp lib\jini-ext.jar;lib\start.jar com.sun.jini.start.ServiceStarter src/test/com/bigdata/service/TestServerStarter.config * </pre> * *************** *** 139,144 **** * * @todo the {@link DestroyAdmin} implementation on the {@link DataServer} is ! * not working correctly. Untangle the various ways in which things can ! * be stopped vs destroyed. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 139,147 ---- * * @todo the {@link DestroyAdmin} implementation on the {@link DataServer} is ! * not working correctly. Untangle the various ways in which things can be ! * stopped vs destroyed. ! * ! * @todo document exit status codes and unify their use in this and derived ! * classes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 165,169 **** private ServiceID serviceID; private DiscoveryManagement discoveryManager; ! private JoinManager joinManager; private Configuration config; /** --- 168,172 ---- private ServiceID serviceID; private DiscoveryManagement discoveryManager; ! private JoinManager joinManager; private Configuration config; /** *************** *** 187,190 **** --- 190,201 ---- /** + * The object used to inform the hosting environment that the server is + * unregistering (terminating). A fake object is used when the server is run + * from the command line, otherwise the object is supplied by the + * {@link NonActivatableServiceDescriptor}. + */ + private LifeCycle lifeCycle; + + /** * Return the assigned {@link ServiceID}. */ *************** *** 194,205 **** } ! /** ! * The object used to inform the hosting environment that the server is ! * unregistering (terminating). A fake object is used when the server is run ! * from the command line, otherwise the object is supplied by the ! * {@link NonActivatableServiceDescriptor}. ! */ ! private LifeCycle lifeCycle; /** --- 205,216 ---- } + + protected DiscoveryManagement getDiscoveryManagement() { + return discoveryManager; + } ! protected JoinManager getJoinManager() { ! return joinManager; ! } /** *************** *** 228,232 **** * @see NonActivatableServiceDescriptor */ ! protected AbstractServer(String[] args, LifeCycle lifeCycle ) { if (lifeCycle == null) --- 239,243 ---- * @see NonActivatableServiceDescriptor */ ! private AbstractServer(String[] args, LifeCycle lifeCycle ) { if (lifeCycle == null) *************** *** 235,239 **** this.lifeCycle = lifeCycle; ! // @todo verify that this belongs here. System.setSecurityManager(new SecurityManager()); --- 246,250 ---- this.lifeCycle = lifeCycle; ! // @todo verify that this belongs here vs in a main(String[]). System.setSecurityManager(new SecurityManager()); *************** *** 330,333 **** --- 341,345 ---- open = true; + log.info("Impl is "+impl); log.info("Proxy is " + proxy + "(" + proxy.getClass() + ")"); *************** *** 356,360 **** */ discoveryManager = new LookupDiscoveryManager( ! groups, unicastLocators, null // DiscoveryListener ); --- 368,372 ---- */ discoveryManager = new LookupDiscoveryManager( ! groups, unicastLocators, null /*DiscoveryListener*/ ); *************** *** 368,372 **** joinManager = new JoinManager(proxy, // service proxy entries, // attr sets ! serviceID, // ServiceIDListener discoveryManager, // DiscoveryManager new LeaseRenewalManager()); --- 380,384 ---- joinManager = new JoinManager(proxy, // service proxy entries, // attr sets ! serviceID, // ServiceID discoveryManager, // DiscoveryManager new LeaseRenewalManager()); *************** *** 511,514 **** --- 523,544 ---- } + // /* + // * DiscoveryListener + // */ + // + // /** + // * NOP. + // */ + // public void discovered(DiscoveryEvent arg0) { + // log.info("DiscoveryListener.discovered: "+arg0); + // } + // + // /** + // * NOP. + // */ + // public void discarded(DiscoveryEvent arg0) { + // log.info("DiscoveryListener.discarded: "+arg0); + // } + /** * Shutdown the server taking time only to unregister it from jini. --- NEW FILE: ClientIndexView.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Apr 22, 2007 */ package com.bigdata.service; import java.util.UUID; import com.bigdata.btree.BatchContains; import com.bigdata.btree.BatchInsert; import com.bigdata.btree.BatchLookup; import com.bigdata.btree.BatchRemove; import com.bigdata.btree.IEntryIterator; import com.bigdata.btree.IIndex; import com.bigdata.scaleup.PartitionedIndexView; /** * A client-side view of an index. * * @todo cache leased information about index partitions of interest to the * client. The cache will be a little tricky since we need to know when * the client does not possess a partition definition. Index partitions * are defined by the separator key - the first key that lies beyond that * partition. the danger then is that a client will presume that any key * before the first leased partition is part of that first partition. To * guard against that the client needs to know both the separator key that * represents the upper and lower bounds of each partition. If a lookup in * the cache falls outside of any known partitions upper and lower bounds * then it is a cache miss and we have to ask the metadata service for a * lease on the partition. the cache itself is just a btree data structure * with the proviso that some cache entries represent missing partition * definitions (aka the lower bounds for known partitions where the left * sibling partition is not known to the client). * * @todo support partitioned indices by resolving client operations against each * index partition as necessary and maintaining leases with the metadata * service - the necessary logic is in the {@link PartitionedIndexView} * and can be refactored for this purpose. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class ClientIndexView implements IIndex { protected final AbstractClient client; protected final String name; protected IMetadataService mdproxy; protected UUID indexUUID; // @todo final. public ClientIndexView(AbstractClient client, String name) { if(client==null) throw new IllegalArgumentException(); if(name==null) throw new IllegalArgumentException(); this.client = client; this.name = name; /* * obtain the proxy for a metadata service. if this instance fails, then * we can always ask for a new instance for the same distributed * database (failover). */ // mdproxy = client.getMetadataService(); /* * obtain the indexUUID. this is a means of verifying that the index * exists. */ // indexUUID = mdproxy.getIndexUUID(name); } public UUID getIndexUUID() { return indexUUID; } public boolean contains(byte[] key) { // TODO Auto-generated method stub return false; } public Object insert(Object key, Object value) { // TODO Auto-generated method stub return null; } public Object lookup(Object key) { // TODO Auto-generated method stub return null; } public Object remove(Object key) { // TODO Auto-generated method stub return null; } public int rangeCount(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub return 0; } public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub return null; } public void contains(BatchContains op) { // TODO Auto-generated method stub } public void insert(BatchInsert op) { // TODO Auto-generated method stub } public void lookup(BatchLookup op) { // TODO Auto-generated method stub } public void remove(BatchRemove op) { // TODO Auto-generated method stub } } Index: AbstractClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractClient.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** AbstractClient.java 27 Mar 2007 14:34:23 -0000 1.1 --- AbstractClient.java 23 Apr 2007 13:09:29 -0000 1.2 *************** *** 117,125 **** /** - * The exported proxy for the service implementation object. - */ - protected Remote proxy; - - /** * Server startup reads {@link Configuration} data from the file(s) named by * <i>args</i>, starts the service, and advertises the service for --- 117,120 ---- Index: MetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataService.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** MetadataService.java 27 Mar 2007 17:11:42 -0000 1.4 --- MetadataService.java 23 Apr 2007 13:09:29 -0000 1.5 *************** *** 79,83 **** * corresponds to the commit time of interest for the database.) */ ! public class MetadataService implements IMetadataService, IServiceShutdown { /** --- 79,83 ---- * corresponds to the commit time of interest for the database.) */ ! public class MetadataService extends DataService implements IMetadataService, IServiceShutdown { /** *************** *** 91,100 **** public MetadataService(Properties properties) { ! /* ! * @todo setup/resolve the journal and the metadata index on ! * the journal. ! */ ! ! journal = new Journal(properties); } --- 91,95 ---- public MetadataService(Properties properties) { ! super(properties); } *************** *** 105,127 **** } - public int getEntryCount(String name) { - // TODO Auto-generated method stub - return 0; - } - - public int rangeCount(String name,byte[] fromKey,byte[] toKey) { - // TODO Auto-generated method stub - return 0; - } - - public void shutdown() { - // TODO Auto-generated method stub - - } - - public void shutdownNow() { - // TODO Auto-generated method stub - - } - } --- 100,102 ---- Index: MetadataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataServer.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** MetadataServer.java 20 Apr 2007 16:36:27 -0000 1.2 --- MetadataServer.java 23 Apr 2007 13:09:29 -0000 1.3 *************** *** 48,61 **** package com.bigdata.service; import java.rmi.Remote; import java.rmi.RemoteException; import java.util.Properties; ! import net.jini.core.lookup.ServiceMatches; ! import net.jini.core.lookup.ServiceRegistrar; import net.jini.core.lookup.ServiceTemplate; ! ! import com.bigdata.journal.IJournal; ! import com.sun.jini.start.LifeCycle; /** --- 48,67 ---- package com.bigdata.service; + import java.io.IOException; import java.rmi.Remote; import java.rmi.RemoteException; + import java.util.Map; import java.util.Properties; + import java.util.concurrent.ConcurrentHashMap; ! import net.jini.core.lookup.ServiceID; ! import net.jini.core.lookup.ServiceItem; import net.jini.core.lookup.ServiceTemplate; ! import net.jini.lease.LeaseRenewalManager; ! import net.jini.lookup.LookupCache; ! import net.jini.lookup.ServiceDiscoveryEvent; ! import net.jini.lookup.ServiceDiscoveryListener; ! import net.jini.lookup.ServiceDiscoveryManager; ! import net.jini.lookup.ServiceItemFilter; /** *************** *** 70,80 **** * the same group. While running, it tracks when data services start and stop so * that it can (re-)allocate index partitions as necessary. ! * <p> ! * The metadata server uses a write through pipeline to replicate its data onto ! * registered secondary metadata servers. If the metadata server fails, clients ! * will automatically fail over to a secondary metadata server. Only the primary ! * metadata server actively tracks the state of data services since secondaries ! * are updated via the write through pipeline to ensure consistency. Secondary ! * metadata servers will notice if the primary dies and elect a new master. * * @todo note that the service update registration is _persistent_ (assuming --- 76,83 ---- * the same group. While running, it tracks when data services start and stop so * that it can (re-)allocate index partitions as necessary. ! * ! * @todo aggregate host load data and service RPC events and report them ! * periodically so that we can track load and make load balancing ! * decisions. * * @todo note that the service update registration is _persistent_ (assuming *************** *** 82,90 **** * wrinkle to how a bigdata instance must be configured. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class MetadataServer extends AbstractServer { /** * @param args --- 85,106 ---- * wrinkle to how a bigdata instance must be configured. * + * @todo should destroy destroy the service instance or the persistent state as + * well? Locally, or as replicated? + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class MetadataServer extends DataServer implements ServiceDiscoveryListener { + private ServiceDiscoveryManager serviceDiscoveryManager = null; + private LookupCache lookupCache = null; + private Map<ServiceID, ServiceItem> serviceIdMap = new ConcurrentHashMap<ServiceID, ServiceItem>(); + + protected LookupCache getLookupCache() { + + return lookupCache; + + } + /** * @param args *************** *** 94,111 **** super(args); ! } ! /** ! * @param args ! * @param lifeCycle ! */ ! public MetadataServer(String[] args, LifeCycle lifeCycle) { ! ! super(args, lifeCycle); ! ! ! } protected Remote newService(Properties properties) { --- 110,164 ---- super(args); ! /* ! * Setup a helper class that will be notified as services join or leave ! * the various registrars to which the metadata server is listening. ! */ ! try { ! serviceDiscoveryManager = new ServiceDiscoveryManager(getDiscoveryManagement(), ! new LeaseRenewalManager()); ! ! } catch(IOException ex) { ! ! log.error("Could not initiate service discovery manager", ex); ! ! System.exit(1); ! ! } ! ! /* ! * Setup a lookupCache that will be populated with all services that match a ! * filter. This is used to keep track of all data services registered ! * with any service registrar to which the metadata server is listening. ! */ ! try { ! ! ServiceTemplate template = new ServiceTemplate(null, ! new Class[] { IDataService.class }, null); ! ! lookupCache = serviceDiscoveryManager ! .createLookupCache(template, ! new DataServiceFilter() /* filter */, this /* ServiceDiscoveryListener */); ! ! } catch(RemoteException ex) { ! ! log.error("Could not setup lookup lookupCache", ex); ! ! System.exit(1); ! ! } ! } + // /** + // * @param args + // * @param lifeCycle + // */ + // public MetadataServer(String[] args, LifeCycle lifeCycle) { + // + // super(args, lifeCycle); + // + // } + protected Remote newService(Properties properties) { *************** *** 115,212 **** /** ! * Return an {@link IMetadataService}. * ! * @param registrar ! * A service registrar to query. ! * ! * @return An {@link IMetadataService} if one was found using that ! * registrar. */ ! public IMetadataService getMetadataService(ServiceRegistrar registrar) { ! Class[] classes = new Class[] {IMetadataService.class}; ! ServiceTemplate template = new ServiceTemplate(null, classes, null); ! IMetadataService proxy = null; ! try { ! proxy = (IMetadataService) registrar.lookup(template); ! ! } catch(java.rmi.RemoteException e) { ! log.warn(e); ! ! } ! return proxy; } /** ! * Return the data service(s) matched on this registrar. ! * ! * @param registrar ! * ! * @return The data service or <code>null</code> if none was matched. * ! * @todo we need to describe the services to be discovered by their primary ! * interface and only search within a designated group that ! * corresponds to the bigdata federation of interest - that group is ! * part of the client configuration. * ! * @todo how do we ensure that we have seen all data services? If we query ! * each registrar as it is discovered and then register for updates ! * there are two ways in which we could miss some instances: (1) new ! * data services register between the query and the registration for ! * updates; and (2) the query will not return _ALL_ data services ! * registered, but only as match as the match limit. */ ! public ServiceMatches getDataServices(ServiceRegistrar registrar) { ! Class[] classes = new Class[] {IDataService.class}; ! ServiceTemplate template = new ServiceTemplate(null, classes, null); ! try { - return registrar.lookup(template,0); - - } catch(java.rmi.RemoteException e) { - - log.warn(e); - - return null; - - } - } /** ! * Extends the behavior to close and delete the journal in use by the ! * metadata service. */ public void destroy() { - - MetadataService service = (MetadataService)impl; ! super.destroy(); ! try { ! ! IJournal journal = service.journal; ! ! log.info("Closing and deleting: "+journal.getFile()); ! ! journal.closeAndDelete(); ! ! log.info("Journal deleted."); ! ! } catch (Throwable t) { ! ! log.warn("Could not delete journal: " + t, t); ! ! } } --- 168,543 ---- /** ! * Filter matches a {@link DataService} but not a {@link MetadataService}. ! * <p> * ! * @todo This explicitly filters out service variants that extend ! * {@link DataService} but which are not tasked as a ! * {@link DataService} by the {@link MetadataService}. It would be ! * easier if we refactored the interface hierarchy a bit so that there ! * was a common interface and abstract class extended by both the ! * {@link DataService} and the {@link MetadataService} such that we ! * could match on their specific interfaces without the possibility of ! * confusion. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ */ ! public static class DataServiceFilter implements ServiceItemFilter { ! ! public boolean check(ServiceItem item) { ! ! if(item.service==null) { ! ! log.warn("Service is null: "+item); ! ! return false; ! ! } ! ! if(!(item.service instanceof IMetadataService)) { ! ! log.info("Matched: "+item); ! ! return true; ! ! } ! ! log.debug("Ignoring: "+item); ! ! return false; ! ! } ! }; ! /* ! * ServiceDiscoveryListener. ! */ ! ! /** ! * Adds the {@link ServiceItem} to the internal map to support {@link #getServiceByID()} ! * <p> ! * Note: This event is generated by the {@link LookupCache}. There is an ! * event for each {@link DataService} as it joins any registrar in the set ! * of registrars to which the {@link MetadataServer} is listening. The set ! * of distinct joined {@link DataService}s is accessible via the ! * {@link LookupCache}. ! */ ! public void serviceAdded(ServiceDiscoveryEvent e) { ! log.info("" + e + ", class=" ! + e.getPostEventServiceItem().toString()); ! serviceIdMap.put(e.getPostEventServiceItem().serviceID, e ! .getPostEventServiceItem()); ! } ! /** ! * NOP. ! */ ! public void serviceChanged(ServiceDiscoveryEvent e) { ! log.info(""+e+", class=" ! + e.getPostEventServiceItem().toString()); ! ! serviceIdMap.put(e.getPostEventServiceItem().serviceID, e ! .getPostEventServiceItem()); } /** ! * NOP. ! */ ! public void serviceRemoved(ServiceDiscoveryEvent e) { ! ! log.info(""+e+", class=" ! + e.getPreEventServiceItem().toString()); ! ! serviceIdMap.remove(e.getPreEventServiceItem().serviceID); ! ! } ! ! /** ! * Resolve the {@link ServiceID} for a {@link DataService} to the cached ! * {@link ServiceItem} for that {@link DataService}. * ! * @param serviceID ! * The {@link ServiceID} for the {@link DataService}. * ! * @return The cache {@link ServiceItem} for that {@link DataService}. */ ! public ServiceItem getDataServiceByID(ServiceID serviceID) { ! return serviceIdMap.get(serviceID); ! } ! ! /** ! * Return the #of {@link DataService}s known to this {@link MetadataServer}. ! * ! * @return The #of {@link DataService}s in the {@link LookupCache}. ! */ ! public int getDataServiceCount() { ! return serviceIdMap.size(); } + // /* + // * DiscoveryListener + // */ + // + // /** + // * Extends base implementation to register for notification of + // * {@link DataService} join/leave events + // * + // * @todo this must be a low-latency handler. + // * + // * @todo pay attention iff registrar is for a group that is used by this + // * metadata service as configured. + // * + // * @todo figure out delta in registrars and in group-to-registrar mapping. + // * + // * @todo register for join/leave events for {@link IDataService} on each new + // * registrar. + // * + // * @todo task a worker to query the new registrar(s) for any existing + // * {@link IDataService}s and continue to query until all such + // * services on the registrar have been discovered. This is the pool of + // * {@link DataService}s that are available to the + // * {@link MetadataService} for management. + // */ + // public void discovered(DiscoveryEvent e) { + // + // super.discovered(e); + // + // ServiceRegistrar[] registrars = e.getRegistrars(); + // + // Map<Integer,String[]> registrarToGroups = (Map<Integer,String[]>) e.getGroups(); + // + // final int nregistrars = registrars.length; + // + // log.info("Reported: "+nregistrars+" registrars"); + // + // ServiceTemplate template = new ServiceTemplate(null, + // new Class[] { IDataService.class }, null); + // + // final long leaseDuration = 5000; + // + // final MarshalledObject handbackObject; + // + // try { + // + // handbackObject = new MarshalledObject("handback"); + // + // } catch (IOException ex) { + // + // // the string must be serializable.... + // throw new AssertionError(ex); + // + // } + // + // for(int i=0; i<nregistrars; i++) { + // + // ServiceRegistrar registrar = registrars[i]; + // + // ServiceMatches matches = getDataServices(registrar); + // + // log.info("Reported: "+matches.totalMatches+" data services on registrar"); + // + // /* + // * Note: This is a persistent registration. + // * + // * @todo share a single remote listener object for all registered + // * events? + // * + // * @todo match all appropriate transitions. + // * + // * @todo explore uses for the handback object - should there be one + // * per registrar? it can serve as a key that identifies the + // * registrar.... + // * + // * @todo the last argument is a lease duration. we will have to + // * renew the lease. see what management classes exist to make this + // * easier ( LeaseRenewalManager ). + // */ + // try { + // + // EventRegistration reg = registrar.notify(template, + // ServiceRegistrar.TRANSITION_NOMATCH_MATCH // created. + //// |ServiceRegistrar.TRANSITION_MATCH_MATCH // modified. + // |ServiceRegistrar.TRANSITION_MATCH_NOMATCH // deleted. + // , new NotifyListener(), handbackObject, leaseDuration); + // + // System.err.println("EventRegistration: "+reg); + // + // } catch (RemoteException ex) { + // + // log.error("Could not register for notification", ex); + // + // } + // + // } + // + // } + // + // /** + // * @todo When a registrar is discarded, do we have to do anything? If the + // * registrar was providing redundency, then we can still reach the + // * various data services. If the registrar was the sole resolver for + // * some services then those services are no longer available -- and + // * probably an WARNing should be logged. + // */ + // public void discarded(DiscoveryEvent e) { + // + // super.discarded(e); + // + // } + // + // /** + // * RemoteEventListener - events are generated by our registered persistent + // * notification with one or more service registrars that we use to listen + // * for join/leave of data services. + // * <p> + // * This class extends {@link UnicastRemoteObject} so that it can run in the + // * {@link ServiceRegistrar}. The class MUST be located in the unpacked JAR + // * identified by the <em>codebase</em>. + // * + // * @todo I have not figured out yet how to get the events back to the + // * {@link MetadataServer} - they are being written in the console in + // * which jini is running since they are received remotely and then + // * need to be passed back to the {@link MetadataServer} somehow. + // * + // * @todo perhaps pass in the {@link ServiceRegistrar} or the + // * {@link MarshalledObject} so that we can identify the service for + // * which the event was generated and pass the event to code on the + // * {@link MetadataServer} instance (transient reference?) that will + // * actually handle the event (notice the join/leave of a data + // * service). + // * + // * @see http://archives.java.sun.com/cgi-bin/wa?A2=ind0410&L=jini-users&D=0&P=30410 + // * + // * @see http://archives.java.sun.com/cgi-bin/wa?A2=ind0410&L=jini-users&D=0&P=29391 + // * + // * @see ServiceDiscoveryManager which can encapsulate the entire problem of + // * listening and also enumerating the existing services. However, its + // * use is limited by NAT (it will not cross a filewall). + // */ + // public static class NotifyListener extends UnicastRemoteObject implements RemoteEventListener { + // + // /** + // * + // */ + // private static final long serialVersionUID = -5847172051441883860L; + // + // public NotifyListener() throws RemoteException { + // super(); + // } + // + // public NotifyListener(int port) throws RemoteException { + // super(port); + // } + // + // /** + // * + // * @param e + // * @throws UnknownEventException + // * @throws RemoteException + // */ + // public void notify(RemoteEvent e) throws UnknownEventException, RemoteException { + // + // System.err.println("notify(RemoveEvent:"+e+")"); + // log.info(e.toString()); + // + // } + // + // } + + // /** + // * Return an {@link IMetadataService}. + // * + // * @param registrar + // * A service registrar to query. + // * + // * @return An {@link IMetadataService} if one was found using that + // * registrar. + // */ + // public IMetadataService getMetadataService(ServiceRegistrar registrar) { + // + // Class[] classes = new Class[] {IMetadataService.class}; + // + // ServiceTemplate template = new ServiceTemplate(null, classes, null); + // + // IMetadataService proxy = null; + // + // try { + // + // proxy = (IMetadataService) registrar.lookup(template); + // + // } catch(java.rmi.RemoteException e) { + // + // log.warn(e); + // + // } + // + // return proxy; + // + // } + // + // /** + // * Return the data service(s) matched on this registrar. + // * + // * @param registrar + // * The {@link ServiceRegistrar} to be queried. + // * + // * @return The data service or <code>null</code> if none was matched. + // * + // * @todo we need to describe the services to be discovered by their primary + // * interface and only search within a designated group that + // * corresponds to the bigdata federation of interest - that group is + // * part of the client configuration. + // * + // * @todo we need to filter out matches on {@link MetadataService} since it + // * extends {@link DataService}. + // * + // * @todo how do we ensure that we have seen all data services? If we query + // * each registrar as it is discovered and then register for updates + // * there are two ways in which we could miss some instances: (1) new + // * data services register between the query and the registration for + // * updates; and (2) the query will not return _ALL_ data services + // * registered, but only as match as the match limit. + // */ + // public ServiceMatches getDataServices(ServiceRegistrar registrar) { + // + // Class[] classes = new Class[] {IDataService.class}; + // + // ServiceTemplate template = new ServiceTemplate(null, classes, null); + // + // try { + // + // return registrar.lookup(template,0); + // + // } catch(java.rmi.RemoteException e) { + // + // log.warn(e); + // + // return null; + // + // } + // + // } + // /** ! * Extends the behavior to terminate {@link LookupCache} and ! * {@link ServiceDiscoveryManager} processing. */ public void destroy() { ! lookupCache.terminate(); ! serviceDiscoveryManager.terminate(); ! ! super.destroy(); } Index: MapReduceService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MapReduceService.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** MapReduceService.java 22 Mar 2007 15:04:15 -0000 1.2 --- MapReduceService.java 23 Apr 2007 13:09:29 -0000 1.3 *************** *** 255,259 **** IMetadataService mds = getMetadataService(); ! final int nentries = mds.rangeCount(name,fromKey,toKey); return null; --- 255,260 ---- IMetadataService mds = getMetadataService(); ! final int nentries = mds.rangeCount(IDataService.UNISOLATED, name, ! fromKey, toKey); return null; Index: IMetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IMetadataService.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IMetadataService.java 13 Apr 2007 15:04:19 -0000 1.4 --- IMetadataService.java 23 Apr 2007 13:09:29 -0000 1.5 *************** *** 63,87 **** * compatible with RMI. * - * @todo extend IDataService - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IMetadataService extends Remote { ! ! /** ! * The approximate number of entries in the index (non-transactional). ! */ ! public int getEntryCount(String name) throws IOException; ! ! /** ! * The approximate number of entries in the index for the specified key ! * range (non-transactional). ! * ! * @param fromKey ! * @param toKey ! * @return ! */ ! public int rangeCount(String name,byte[] fromKey,byte[] toKey) throws IOException; /** --- 63,70 ---- * compatible with RMI. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IMetadataService extends IDataService { /** |