From: <btm...@us...> - 2010-11-14 16:46:02
|
Revision: 3948 http://bigdata.svn.sourceforge.net/bigdata/?rev=3948&view=rev Author: btmurphy Date: 2010-11-14 16:45:55 +0000 (Sun, 14 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - phase 1 of callable executor (client service) smart proxy work. Added code to deal with the new asynchronous initializaztion of the shard (data) service; that is, code that tests to determine whether the shard service is not ready because it's still initializing, and then performs a set of retries to allow the shard service to complete its startup processing Modified Paths: -------------- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -1338,10 +1338,10 @@ /* * Note: By setting this to null we are indicating to * the RegisterIndexTask on the data service that it - * needs to set the resourceMetadata[] when the index is - * actually registered based on the live journal as of - * the when the task actually executes on the data - * service. + * needs to set the resourceMetadata[] when the index + * is actually registered based on the live journal + * as of the when the task actually executes on the + * data service. */ null,//[resources] Signal to the RegisterIndexTask. null //[cause] Signal to RegisterIndexTask @@ -1350,11 +1350,53 @@ // */ // ,"createScaleOutIndex(name="+scaleOutIndexName+") " )); - - dataServices[i].registerIndex - (Util.getIndexPartitionName(scaleOutIndexName, - pmd.getPartitionId()), - md); + + // The shard service (as currently implemented) may not be + // completely initialized if it is just being started + // when this method is called (for example, in a test + // environment). This is because the shard service creates + // a StoreManager (via a Resource), which depends on + // discovering a transaction service; and it sets up + // counters, which depend on discovering a load balancer. + // Thus, to address the case where the shard service is + // not yet ready, test for such a situation; and apply a + // retry-to-failure strategy + boolean registered = false; + try { + dataServices[i].registerIndex + ( Util.getIndexPartitionName(scaleOutIndexName, + pmd.getPartitionId()), + md ); + registered = true; + } catch(Throwable t1) { + if ( !Util.causeNoSuchObject(t1) ) { + throw new Exception(t1); + } + //wait for data service to finish initializing + int nWait = 5; + for(int n=0; n<nWait; n++) { + Util.delayMS(1000L); + try { + dataServices[i].registerIndex + ( Util.getIndexPartitionName + (scaleOutIndexName, + pmd.getPartitionId()), + md ); + registered = true; + break; + } catch(Throwable t2) { + if ( !Util.causeNoSuchObject(t2) ) { + throw new Exception(t2); + } + } + } + } + if (!registered) {// try one last time + dataServices[i].registerIndex + ( Util.getIndexPartitionName(scaleOutIndexName, + pmd.getPartitionId()), + md ); + } partitions[i] = pmd; } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -273,8 +273,8 @@ ((this.localResources).getScheduledExecutor()) .scheduleWithFixedDelay (deferredInitTask, - 20L*1000L,//initial delay - 30L*1000L,//period + 1L*1000L,//initial delay + 3L*1000L,//period TimeUnit.MILLISECONDS); } } @@ -318,7 +318,9 @@ * not complete within a timeout. */ public boolean isOpen() { - return ( (concurrencyMgr != null) && (concurrencyMgr.isOpen()) ); + return ( (concurrencyMgr != null) && + (concurrencyMgr.isOpen()) && + deferredInitDone ); } synchronized public void shutdown() { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -86,6 +86,7 @@ import java.io.File; import java.io.IOException; +import java.rmi.NoSuchObjectException; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.Arrays; @@ -188,7 +189,7 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); embeddedShardService.registerIndex(name, metadata); } @@ -196,14 +197,14 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); embeddedShardService.dropIndex(name); } public IBlock readBlock(IResourceMetadata resource, long addr) throws RemoteException, IOException { - readyState.check(); + readyCheck(); return embeddedShardService.readBlock(resource, addr); } @@ -211,7 +212,7 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); return embeddedShardService.getIndexMetadata(name, timestamp); } @@ -225,7 +226,7 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); return embeddedShardService.rangeIterator (tx, name, fromKey, toKey, capacity, flags, filter); } @@ -234,7 +235,7 @@ public <T> Future<T> submit(IDataServiceCallable<T> task) throws RemoteException { - readyState.check(); + readyCheck(); Exporter exporter = null; try { exporter = Util.getExporter(config, @@ -256,7 +257,7 @@ public Future submit(long tx, String name, IIndexProcedure proc) throws RemoteException { - readyState.check(); + readyCheck(); Exporter exporter = null; try { exporter = Util.getExporter(config, @@ -279,19 +280,19 @@ public boolean purgeOldResources(long timeout, boolean truncateJournal) throws RemoteException, InterruptedException { - readyState.check(); + readyCheck(); return embeddedShardService.purgeOldResources(timeout,truncateJournal); } public void setReleaseTime(long releaseTime) throws RemoteException, IOException { - readyState.check(); + readyCheck(); embeddedShardService.setReleaseTime(releaseTime); } public void abort(long tx) throws RemoteException, IOException { - readyState.check(); + readyCheck(); embeddedShardService.abort(tx); } @@ -299,14 +300,14 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); return embeddedShardService.singlePhaseCommit(tx); } public void prepare(long tx, long revisionTime) throws RemoteException, IOException, Throwable { - readyState.check(); + readyCheck(); embeddedShardService.prepare(tx, revisionTime); } @@ -314,19 +315,19 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); embeddedShardService.forceOverflow(immediate, compactingMerge); } public long getAsynchronousOverflowCounter() throws RemoteException, IOException { - readyState.check(); + readyCheck(); return embeddedShardService.getAsynchronousOverflowCounter(); } public boolean isOverflowActive() throws RemoteException, IOException { - readyState.check(); + readyCheck(); return embeddedShardService.isOverflowActive(); } @@ -668,10 +669,40 @@ readyState.ready();//ready to accept calls from clients } + // Private methods + + // Throws NoSuchObjectException if the service has either + // not completed initialization and registration with the + // lookup service, or not created the concurrency manager + // and resource (which depends on discovering the transaction + // service) + private void readyCheck() { + readyState.check();//completed service init? + + // created concurrency and resource managers? + if ( !embeddedShardService.isOpen() ) { + throw new RemoteExceptionWrapper + (new NoSuchObjectException("not ready")); + } + } + private void shutdownDo(ShutdownType type) { (new ShutdownThread(type)).start(); } + private void killDo(int status) { + String[] groups = ((DiscoveryGroupManagement)ldm).getGroups(); + LookupLocator[] locs = ((DiscoveryLocatorManagement)ldm).getLocators(); + logger.log(Level.INFO, killStr+" [groups=" + +Util.writeGroupArrayToString(groupsToJoin) + +", locators=" + +Util.writeArrayElementsToString(locatorsToJoin)+"]"); + + System.exit(status); + } + + // Nested classes + /** * Used to shutdown the service asynchronously. */ @@ -745,15 +776,15 @@ } } - private void killDo(int status) { - String[] groups = ((DiscoveryGroupManagement)ldm).getGroups(); - LookupLocator[] locs = ((DiscoveryLocatorManagement)ldm).getLocators(); - logger.log(Level.INFO, killStr+" [groups=" - +Util.writeGroupArrayToString(groupsToJoin) - +", locators=" - +Util.writeArrayElementsToString(locatorsToJoin)+"]"); - - System.exit(status); + private static class RemoteExceptionWrapper extends RuntimeException { + private static final long serialVersionUID = 1L; + private final RemoteException wrapped; + public RemoteExceptionWrapper(RemoteException wrapped) { + this.wrapped = wrapped; + } + private Object writeReplace() { + return wrapped; + } } /** Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -73,6 +73,7 @@ import net.jini.lookup.ServiceDiscoveryManager; import java.io.IOException; +import java.rmi.NoSuchObjectException; import java.rmi.Remote; import java.rmi.server.ExportException; import java.util.Collection; @@ -110,6 +111,26 @@ return min; } + public static void delayMS(long nMS) { + try { + Thread.sleep(nMS); + } catch (InterruptedException e) { } + } + + public static boolean causeNoSuchObject(Throwable t) { + if (t instanceof NoSuchObjectException) return true; + + // test cause chain for NoSuchObjectException + Throwable cause = t.getCause(); + while ( (cause != null) && + !(cause instanceof NoSuchObjectException) ) + { + cause = cause.getCause(); + } + if (cause == null) return false; + return (cause instanceof NoSuchObjectException); + } + /* Convenience method that can be called when a service exits, or * when failure occurs during the service's initialization process. * This method un-does any work that may have already been completed; Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -132,7 +132,33 @@ metadata.setDeleteMarkers(true); - fed.registerIndex(metadata); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE fed.registerIndex(metadata); + boolean registered = false; + try { + fed.registerIndex(metadata); + registered = true; + } catch(Throwable t1) { + if ( !Util.causeNoSuchObject(t1) ) { + throw new Exception(t1); + } + //wait for data service to finish initializing + int nWait = 5; + for(int i=0; i<nWait; i++) { + Util.delayMS(1000L); + try { + fed.registerIndex(metadata); + registered = true; + break; + } catch(Throwable t2) { + if ( !Util.causeNoSuchObject(t2) ) { + throw new Exception(t2); + } + } + } + } + assertTrue("failed to register metadata", registered); +//BTM - PRE_CLIENT_SERVICE - END final IIndex ndx = fed.getIndex(name, ITx.UNISOLATED); @@ -166,9 +192,7 @@ if (dataService0 == null) { for(int i=0; i<nWait; i++) { - try { - Thread.sleep(1L*1000L); - } catch (InterruptedException e) { } + Util.delayMS(1000L); dataService0 = helper.getDataService0(); if (dataService0 != null) break; } @@ -182,9 +206,7 @@ if (dataService1 == null) { for(int i=0; i<nWait; i++) { - try { - Thread.sleep(1L*1000L); - } catch (InterruptedException e) { } + Util.delayMS(1000L); dataService1 = helper.getDataService1(); if (dataService1 != null) break; } @@ -194,19 +216,60 @@ } //BTM - END --------------------------------------------------- - final UUID indexUUID = fed.registerIndex( metadata, // - // separator keys. - new byte[][] { - new byte[]{}, - TestKeyBuilder.asSortKey(500) - },// - // data service assignments. - new UUID[] { // -//BTM helper.getDataService0().getServiceUUID(),// -//BTM helper.getDataService1().getServiceUUID() // -dataService0UUID, -dataService1UUID - }); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final UUID indexUUID = fed.registerIndex( metadata, // +//BTM - PRE_CLIENT_SERVICE // separator keys. +//BTM - PRE_CLIENT_SERVICE new byte[][] { +//BTM - PRE_CLIENT_SERVICE new byte[]{}, +//BTM - PRE_CLIENT_SERVICE TestKeyBuilder.asSortKey(500) +//BTM - PRE_CLIENT_SERVICE },// +//BTM - PRE_CLIENT_SERVICE // data service assignments. +//BTM - PRE_CLIENT_SERVICE new UUID[] { // +//BTM - PRE_CLIENT_SERVICE dataService0UUID, +//BTM - PRE_CLIENT_SERVICE dataService1UUID +//BTM - PRE_CLIENT_SERVICE }); + UUID indexUUID = null; + boolean registered = false; + try { + indexUUID = + fed.registerIndex + ( metadata, + new byte[][] + { new byte[]{}, + TestKeyBuilder.asSortKey(500) }, + new UUID[] { dataService0UUID, + dataService1UUID } + ); + registered = true; + } catch(Throwable t1) { + if ( !Util.causeNoSuchObject(t1) ) { + throw new Exception(t1); + } + //wait for data service to finish initializing + nWait = 5; + for(int i=0; i<nWait; i++) { + Util.delayMS(1000L); + try { + indexUUID = + fed.registerIndex + ( metadata, + new byte[][] + { new byte[]{}, + TestKeyBuilder.asSortKey(500) }, + new UUID[] { dataService0UUID, + dataService1UUID } + ); + registered = true; + break; + } catch(Throwable t2) { + if ( !Util.causeNoSuchObject(t2) ) { + throw new Exception(t2); + } + } + } + } + assertTrue("failed to register metadata", registered); +//BTM - PRE_CLIENT_SERVICE - END final IIndex ndx = fed.getIndex(name, ITx.UNISOLATED); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-11-23 21:39:41
|
Revision: 3983 http://bigdata.svn.sourceforge.net/bigdata/?rev=3983&view=rev Author: btmurphy Date: 2010-11-23 21:39:34 +0000 (Tue, 23 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - added first cut of the QuorumPeerManager class that wraps the ZooKeeper client for discovery and error handling, added the testQuorumPeerManager method to the QuorumPeerServiceTest class (note also that as part of the previous checkpoint, the zookeeper jar file was upgraded to the latest zookeeper-3.3.2.jar release, which included changing build.xml to reference the new jar file in the compile classpath) Modified Paths: -------------- branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java Added Paths: ----------- branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/ branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java Added: branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java (rev 0) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java 2010-11-23 21:39:34 UTC (rev 3983) @@ -0,0 +1,694 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.jini.quorum; + +import com.bigdata.attr.QuorumPeerAttr; +import com.bigdata.service.QuorumPeerService; +import com.bigdata.service.Service; +import com.bigdata.util.EntryUtil; +import com.bigdata.util.Util; +import com.bigdata.util.config.LogUtil; + +import net.jini.core.entry.Entry; +import net.jini.core.lookup.ServiceID; +import net.jini.core.lookup.ServiceItem; +import net.jini.core.lookup.ServiceTemplate; +import net.jini.lookup.LookupCache; +import net.jini.lookup.ServiceDiscoveryEvent; +import net.jini.lookup.ServiceDiscoveryListener; +import net.jini.lookup.ServiceDiscoveryManager; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.net.InetAddress; +import java.rmi.RemoteException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Helper class that wraps the <code>org.apache.zookeeper.ZooKeeper</code> + * client class, providing additional covenient functionality related to + * dynamic discovery of the connection information associated with the + * peers in the federation's ensemble, as well as exception and session + * expiry handling. + */ +public class QuorumPeerManager { + + private Logger logger; + + private ServiceDiscoveryManager sdm; + private int sessionTimeout; + + // How long to wait for the ensemble to be discovered (and the + // connectString to be constructed) + // 0L ==> don't wait, the caller will provide its own retry logic + // Long.MAX_VALUE ==> wait forever + // negative ==> use backoff strategy specified by this class + private long discoverWait; + + // How long to wait for a connection before declaring failure + // 0L ==> try to connect only once + // Long.MAX_VALUE ==> wait forever + // negative ==> use backoff strategy specified by this class + private long connectWait; + + private LookupCache quorumServiceCache; + private Map<UUID, String> hostPortMap = + new ConcurrentHashMap<UUID, String>(); + + private volatile String connectString = null; + private volatile ZooKeeper zkClient; + private volatile boolean terminated = false; + + private Object syncObj = new Object(); + private static long[] discoverBackoff = + {1L, 2L, 4L, 8L, 16L, 32L, 64L, 128L, 256L};//seconds + private static long[] connectBackoff = + {1L, 2L, 4L, 8L, 16L, 32L, 64L, 128L};//seconds + + public QuorumPeerManager(ServiceDiscoveryManager sdm, + int sessionTimeout, + Logger logger) + { + this(sdm, sessionTimeout, -1L, -1L, logger); + } + + public QuorumPeerManager(ServiceDiscoveryManager sdm, + int sessionTimeout, + long discoverWait, + long connectWait, + Logger logger) + { + if (sdm == null) { + throw new NullPointerException("null sdm"); + } + this.sdm = sdm; + ServiceDiscoveryListener cacheListener = new CacheListener(logger); + + // Discover all QuorumPeerServices that have the join the federation + // the given sdm is configured to discover (by groups and/or locs) + + Class[] quorumServiceType = new Class[] {QuorumPeerService.class}; + ServiceTemplate quorumServiceTmpl = + new ServiceTemplate(null, quorumServiceType, null); + try { + this.quorumServiceCache = sdm.createLookupCache(quorumServiceTmpl, + null,//filter + cacheListener); + } catch(RemoteException e) { + logger.warn(e.getMessage(), e); + } + + this.sessionTimeout = sessionTimeout; + this.discoverWait = discoverWait; + this.connectWait = connectWait; + this.logger = (logger == null ? + LogUtil.getLog4jLogger((this.getClass()).getName()) : + logger); + } + + // Wrapped methods from org.apache.zookeper.ZooKeeper client class + + public void addAuthInfo(String scheme, byte[] auth) throws IOException { + checkTerminated(); + getClient().addAuthInfo(scheme, auth); + } + + public void close() { + if (terminated) return; + if ( (zkClient != null) && (zkClient.getState().isAlive()) ) { + try { + zkClient.close(); + } catch(InterruptedException e) {//swallow + } + } + terminated = true; + connectString = null; + } + + public String create(String path, + byte[] data, + List<ACL> acl, + CreateMode createMode) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().create(path, data, acl, createMode); + } + + public void create(String path, + byte[] data, + List<ACL> acl, + CreateMode createMode, + AsyncCallback.StringCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().create(path, data, acl, createMode, cb, ctx); + } + + public void delete(String path, int version) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + getClient().delete(path, version); + } + + public void delete(String path, + int version, + AsyncCallback.VoidCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().delete(path, version, cb, ctx); + } + + public Stat exists(String path, Watcher watcher) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().exists(path, watcher); + } + + public Stat exists(String path, boolean watch) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().exists(path, watch); + } + + public void exists(String path, + Watcher watcher, + AsyncCallback.StatCallback cb, + Object ctx) + throws IOException + { + checkTerminated(); + getClient().exists(path, watcher, cb, ctx); + } + + public void exists(String path, + boolean watch, + AsyncCallback.StatCallback cb, + Object ctx) + throws IOException + { + checkTerminated(); + getClient().exists(path, watch, cb, ctx); + } + + public byte[] getData(String path, Watcher watcher, Stat stat) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getData(path, watcher, stat); + } + + public byte[] getData(String path, boolean watch, Stat stat) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getData(path, watch, stat); + } + + public void getData(String path, + Watcher watcher, + AsyncCallback.DataCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getData(path, watcher, cb, ctx); + } + + public void getData(String path, + boolean watch, + AsyncCallback.DataCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getData(path, watch, cb, ctx); + } + + public Stat setData(String path, byte[] data, int version) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().setData(path, data, version); + } + + public void setData(String path, + byte[] data, + int version, + AsyncCallback.StatCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().setData(path, data, version, cb, ctx); + } + + public List<ACL> getACL(String path, Stat stat) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getACL(path, stat); + } + + public void getACL(String path, + Stat stat, + AsyncCallback.ACLCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getACL(path, stat, cb, ctx); + } + + public Stat setACL(String path, List<ACL> acl, int version) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().setACL(path, acl, version); + } + + public void setACL(String path, + List<ACL> acl, + int version, + AsyncCallback.StatCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().setACL(path, acl, version, cb, ctx); + } + + public List<String> getChildren(String path, Watcher watcher) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getChildren(path, watcher); + } + + public List<String> getChildren(String path, boolean watch) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getChildren(path, watch); + } + + public void getChildren(String path, + Watcher watcher, + AsyncCallback.ChildrenCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getChildren(path, watcher, cb, ctx); + } + + public void getChildren(String path, + boolean watch, + AsyncCallback.ChildrenCallback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getChildren(path, watch, cb, ctx); + } + + public List<String> getChildren(String path, Watcher watcher, Stat stat) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getChildren(path, watcher, stat); + } + + public List<String> getChildren(String path, boolean watch, Stat stat) + throws IOException, KeeperException, InterruptedException + { + checkTerminated(); + return getClient().getChildren(path, watch, stat); + } + + public void getChildren(String path, + Watcher watcher, + AsyncCallback.Children2Callback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getChildren(path, watcher, cb, ctx); + } + + public void getChildren(String path, + boolean watch, + AsyncCallback.Children2Callback cb, + Object ctx) throws IOException + { + checkTerminated(); + getClient().getChildren(path, watch, cb, ctx); + } + + public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) + throws IOException + { + checkTerminated(); + getClient().sync(path, cb, ctx); + } + + public ZooKeeper.States getState() throws IOException { + checkTerminated(); + return getClient().getState(); + } + + public String toString() { + checkTerminated(); + try { + return getClient().toString(); + } catch(IOException e) { + // default value when client unavailable + return "[connectString="+connectString + +", sessionTimeout="+sessionTimeout + +", connectWait="+connectWait + +", terminated="+terminated+"]"; + } + } + + // Private methods + + private ZooKeeper getClient() throws IOException { + if ( (zkClient != null) && (zkClient.getState().isAlive()) ) { + return zkClient; + } + + // Determine if ensemble has been discovered yet + + if (connectString == null) { + if(discoverWait > 0L) {//retry for discoverWait seconds + for (long i=0L; i<discoverWait; i++) { + Util.delayMS(1L*1000L); + if (connectString != null) break; + if (logger.isDebugEnabled()) { + logger.debug("ensemble still not discovered " + +"[try #"+(i+1)+"]"); + } + } + } else if (discoverWait < 0L) {//retry with backoff + for (int i=0; i<discoverBackoff.length; i++) { + Util.delayMS(discoverBackoff[i]*1000L); + if (connectString != null) break; + if (logger.isDebugEnabled()) { + logger.debug("ensemble still not discovered " + +"[try #"+(i+1)+"]"); + } + } + } + if (connectString == null) {//still not discovered ==> fail + zkClient = null; + throw new IllegalStateException + ("never discovered zookeeper ensemble"); + } + } + + // Ensemble discovered and connectString constructed, construct client + + zkClient = new ZooKeeper(connectString, + sessionTimeout, + new ZookeeperEventListener(logger)); + + // Connect to ensemble + + ZooKeeper.States state = zkClient.getState(); + logger.debug("state[try #0] = "+state); + + if ( !state.equals(ZooKeeper.States.CONNECTED) ) { + boolean connected = false; + if (connectWait == 0L) {//tried once above + zkClient = null; + } else if (connectWait > 0L) {//retry until connected or timeout + for (long i=0L; i<connectWait; i++) { + Util.delayMS(1L*1000L); + state = zkClient.getState(); + if (logger.isDebugEnabled()) { + logger.debug("state[try #"+(i+1)+"] = "+state); + } + if ( state.equals(ZooKeeper.States.CONNECTED) ) { + connected = true; + break; + } + } + } else { //connectWait < 0L ==> retry with default backoff + for (int i=0; i<connectBackoff.length; i++) { + Util.delayMS(connectBackoff[i]*1000L); + state = zkClient.getState(); + if (logger.isDebugEnabled()) { + logger.debug("state[try #"+(i+1)+"] = "+state); + } + if ( state.equals(ZooKeeper.States.CONNECTED) ) { + connected = true; + break; + } + } + } + if (!connected) zkClient = null;//never connected + } + if (zkClient == null) { + throw new IllegalStateException("zookeeper ensemble unavailable"); + } + return zkClient; + } + + private void checkTerminated() { + if (terminated) { + throw new IllegalStateException("QuorumPeerManager terminated"); + } + } + + // Nested class(es) + + private class CacheListener implements ServiceDiscoveryListener { + private Logger logger; + CacheListener(Logger logger) { + this.logger = logger; + } + public void serviceAdded(ServiceDiscoveryEvent event) { + ServiceItem item = event.getPostEventServiceItem(); + + ServiceID serviceId = item.serviceID; + Object service = item.service; + Entry[] attrs = item.attributeSets; + + Class serviceType = service.getClass(); + UUID serviceUUID = ((Service)service).getServiceUUID(); + + QuorumPeerAttr quorumPeerAttr = + (QuorumPeerAttr)(EntryUtil.getEntryByType + (attrs, QuorumPeerAttr.class)); + + InetAddress peerAddr = quorumPeerAttr.address; + int clientPort = quorumPeerAttr.clientPort; + int ensembleSize = quorumPeerAttr.nQuorumPeers; + + if(logger.isDebugEnabled()) { + logger.log(Level.DEBUG, "1 of "+ensembleSize+" quorum peer(s) " + +"DISCOVERED [addr="+peerAddr+", port=" + +clientPort+"]"); + } + hostPortMap.put(serviceUUID, peerAddr+":"+clientPort); + + // Build connectString when all expected peers found + synchronized(syncObj) { + if (hostPortMap.size() == ensembleSize) { + Iterator<String> itr = (hostPortMap.values()).iterator(); + //build connectString + StringBuffer strBuf = null; + if (itr.hasNext()) { + strBuf = new StringBuffer(itr.next()); + } + while( itr.hasNext() ) { + strBuf.append(","+itr.next()); + } + connectString = strBuf.toString(); + logger.debug("connectString = "+connectString); + } + } + } + + public void serviceRemoved(ServiceDiscoveryEvent event) { + ServiceItem item = event.getPreEventServiceItem(); + + ServiceID serviceId = item.serviceID; + Object service = item.service; + Entry[] attrs = item.attributeSets; + + Class serviceType = service.getClass(); + UUID serviceUUID = ((Service)service).getServiceUUID(); + + QuorumPeerAttr quorumPeerAttr = + (QuorumPeerAttr)(EntryUtil.getEntryByType + (attrs, QuorumPeerAttr.class)); + + InetAddress peerAddr = quorumPeerAttr.address; + int clientPort = quorumPeerAttr.clientPort; + int ensembleSize = quorumPeerAttr.nQuorumPeers; + + if(logger.isDebugEnabled()) { + logger.log(Level.DEBUG, "1 of "+ensembleSize+" quorum peer(s) " + +"DOWN [addr="+peerAddr+", port="+clientPort+"]"); + } + hostPortMap.remove(serviceUUID); + } + + public void serviceChanged(ServiceDiscoveryEvent event) { + + ServiceItem preItem = event.getPreEventServiceItem(); + ServiceItem postItem = event.getPostEventServiceItem(); + + ServiceID serviceId = postItem.serviceID; + Object service = postItem.service; + + Class serviceType = service.getClass(); + + Entry[] preAttrs = preItem.attributeSets; + Entry[] postAttrs = postItem.attributeSets; + + UUID serviceUUID = ((Service)service).getServiceUUID(); + + QuorumPeerAttr preQuorumPeerAttr = null; + QuorumPeerAttr postQuorumPeerAttr = null; + + if (preAttrs != null) { + preQuorumPeerAttr = + (QuorumPeerAttr)(EntryUtil.getEntryByType + (preAttrs, QuorumPeerAttr.class)); + } + if (postAttrs != null) { + postQuorumPeerAttr = + (QuorumPeerAttr)(EntryUtil.getEntryByType + (postAttrs, QuorumPeerAttr.class)); + } + + InetAddress prePeerAddr = null; + int preClientPort = Integer.MIN_VALUE; + int preEnsembleSize = Integer.MIN_VALUE; + if (preQuorumPeerAttr != null) { + prePeerAddr = preQuorumPeerAttr.address; + preClientPort = preQuorumPeerAttr.clientPort; + preEnsembleSize = preQuorumPeerAttr.nQuorumPeers; + } + + InetAddress postPeerAddr = null; + int postClientPort = Integer.MIN_VALUE; + int postEnsembleSize = Integer.MIN_VALUE; + if (postQuorumPeerAttr != null) { + postPeerAddr = postQuorumPeerAttr.address; + postClientPort = postQuorumPeerAttr.clientPort; + postEnsembleSize = postQuorumPeerAttr.nQuorumPeers; + } + + if ((preQuorumPeerAttr != null) && (postQuorumPeerAttr != null)) { + String logStr = "quorum peer(s) CHANGED [pre: addr=" + +prePeerAddr+", port="+preClientPort + +", ensembleSize="+preEnsembleSize + +" >>> post: addr="+postPeerAddr + +", port="+postClientPort+", ensembleSize=" + +postEnsembleSize+"]"; + if ( (prePeerAddr == null) || (postPeerAddr == null) ) { + logger.warn(logStr); + return; + } + if ( !(prePeerAddr.equals(postPeerAddr)) ) { + logger.warn(logStr); + return; + } + if (preClientPort != postClientPort) { + logger.warn(logStr); + return; + } + if (preEnsembleSize != postEnsembleSize) { + logger.warn(logStr); + return; + } + logger.debug(logStr); + } else if( (preQuorumPeerAttr == null) && + (postQuorumPeerAttr != null)) + { + logger.warn("quorum peer(s) CHANGED [attribute added >>> " + +"post: addr="+postPeerAddr+", port=" + +postClientPort+", ensembleSize=" + +postEnsembleSize+"]"); + return; + } else {// pre != null, post == null ==> removed attr + logger.warn("quorum peer(s) CHANGED [pre: addr="+prePeerAddr + +", port="+preClientPort+", ensembleSize=" + +preEnsembleSize+" >>> attribute removed]"); + return; + } + } + } + + private static class ZookeeperEventListener implements Watcher { + private Logger logger; + + public ZookeeperEventListener(Logger logger) { + this.logger = + (logger == null ? + LogUtil.getLog4jLogger((this.getClass()).getName()) : + logger); + } + + public void process(WatchedEvent event) { + KeeperState eventState = event.getState(); + switch (eventState) { + case Unknown: + logger.warn + ("zookeeper event [state="+eventState + +", event="+event+"]"); + break; + case Disconnected: + logger.debug("zookeeper event [state="+eventState+"]");; + break; + case SyncConnected: + logger.debug("zookeeper event [state="+eventState+"]");; + break; + case Expired: + logger.warn("zookeeper event [state="+eventState+"]"); + break; + } + + } + } + +} Property changes on: branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java 2010-11-23 19:58:22 UTC (rev 3982) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java 2010-11-23 21:39:34 UTC (rev 3983) @@ -38,6 +38,7 @@ import junit.framework.TestCase; import junit.framework.TestSuite; +import com.bigdata.jini.quorum.QuorumPeerManager; import com.bigdata.service.QuorumPeerService; import com.bigdata.util.Util; import com.bigdata.util.config.NicUtil; @@ -71,7 +72,11 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.Watcher.Event.KeeperState; import java.io.BufferedReader; import java.io.File; @@ -198,7 +203,7 @@ private static String[] groupsToDiscover = new String[] {"qaQuorumGroup"}; private static LookupLocator[] locsToDiscover = new LookupLocator[0]; private static DiscoveryManagement ldm; - protected static ServiceDiscoveryManager sdm; + private static ServiceDiscoveryManager sdm; private static CacheListener cacheListener; private static LookupCache quorumCache; @@ -467,9 +472,11 @@ logger.debug("\n\n"+testName+" EXIT\n"); } -// @Test(timeout=5000) - public void testGetSessionId() throws Exception { - testName = "testGetSessionId"; + // Verifies that the ZooKeeper client can be used to connect to the + // ensemble within a given amount of time +// @Test(timeout=20000) + public void testZooKeeperConnect() throws Exception { + testName = "testZooKeeperConnect"; testPassed = false; logger.info("\n\n-- "+testName+" ENTER ----------\n"); @@ -482,22 +489,68 @@ String connectString = strBuf.toString(); logger.info("connectString = "+connectString); - int sessionTimeout = 10000;//10 seconds - ZooKeeper zkClient = - new ZooKeeper(connectString, sessionTimeout, null); + int sessionTimeout = 40*1000;//max when tickTime is 2000 + ZooKeeper zkClient = new ZooKeeper(connectString, + sessionTimeout, + new ZookeeperEventListener()); + ZooKeeper.States state = zkClient.getState(); + logger.info("state[try #0] = "+state); - long sessionId = zkClient.getSessionId(); - logger.info("sessionId = "+sessionId); - + if ( !state.equals(ZooKeeper.States.CONNECTED) ) { + int nWait = 10; + for (int i=0; i<nWait; i++) { + Util.delayMS(1L*1000L); + state = zkClient.getState(); + logger.info("state[try #"+(i+1)+"] = "+zkClient.getState()); + if ( state.equals(ZooKeeper.States.CONNECTED) ) break; + } + } + if ( state.equals(ZooKeeper.States.CONNECTED) ) { + testPassed = true; + } zkClient.close(); + logger.debug("\n\n"+testName+" EXIT\n"); + } + // Verifies that the QuorumPeerManager class that wraps the ZooKeeper + // client can be used to discover and connect to the ensemble started + // by this test class. +// @Test(timeout=20000) + public void testQuorumPeerManagerConnect() throws Exception { + testName = "testQuorumPeerManagerConnect"; + testPassed = false; + logger.info("\n\n-- "+testName+" ENTER ----------\n"); + + int sessionTimeout = 40*1000;//max when tickTime is 2000 + QuorumPeerManager peerMgr = + new QuorumPeerManager(sdm, sessionTimeout, logger); + assertTrue("failed on QuorumPeerManager instantiation " + +"[null returned]", (peerMgr != null) ); + + ZooKeeper.States state = null; + try { + state = peerMgr.getState(); + } catch(IOException e) { + logger.warn("failed on QuorumPeerManager instantiation", e); + return; + } + assertTrue("getState failed [null]", (state != null) ); + logger.info("state = "+state); + + assertTrue("getState failed [not connected]", + state.equals(ZooKeeper.States.CONNECTED) ); + testPassed = true; + peerMgr.close(); logger.debug("\n\n"+testName+" EXIT\n"); } // Special test that is always the last test; to clearly distinguish the // logged output produced by the previous tests from the logged output // produced by the tearDown process. + // + // REMOVE this test when/if this test class is changed to use the + // @BeforeClass annotation. public void testLast() throws Exception { logger.info("\n\n-- BEGIN TEARDOWN ----------\n"); lastTest = true; @@ -761,8 +814,34 @@ } } - static class ServiceStarterTask implements Runnable { + private static class ZookeeperEventListener implements Watcher { + public void process(WatchedEvent event) { + KeeperState eventState = event.getState(); + switch (eventState) { + case Unknown: + logger.warn + ("zookeeper event [state="+eventState + +", event="+event+"]"); + break; + case Disconnected: + logger.info + ("zookeeper event [state="+eventState+"]");; + break; + case SyncConnected: + logger.info + ("zookeeper event [state="+eventState+"]");; + break; + case Expired: + logger.warn + ("zookeeper event [state="+eventState+"]"); + break; + } + } + } + + private static class ServiceStarterTask implements Runnable { + private String serviceStateDir; private String ensembleSizeOverride; private String clientPortOverride; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-12-09 22:59:02
|
Revision: 4001 http://bigdata.svn.sourceforge.net/bigdata/?rev=4001&view=rev Author: btmurphy Date: 2010-12-09 22:58:55 +0000 (Thu, 09 Dec 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - changes & fixes to allow com.bigdata.jini tests to use the smart proxy based zookeeper; in particular, changes to the zookeeper shutdown code in JiniServicesHelper Modified Paths: -------------- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -457,7 +457,7 @@ Boolean.FALSE); this.sdm = new ServiceDiscoveryManager(ldm, null, config); - if (zookeeperAccessor == null) { + if ( (zookeeperAccessor == null) || !(zookeeperAccessor.isOpen()) ) { setZookeeperConfigInfo(config, this.sdm); zookeeperAccessor = new ZooKeeperAccessor Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -350,7 +350,13 @@ } //Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -346,7 +346,13 @@ this.scheduledExecutor = Executors.newScheduledThreadPool(1); // Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -289,7 +289,13 @@ serviceId = configStateInfo.getServiceId(); //Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -57,6 +57,16 @@ import java.util.ArrayList; import java.util.UUID; +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN +import com.bigdata.service.QuorumPeerService; +import com.bigdata.util.config.NicUtil; +import com.sun.jini.admin.DestroyAdmin; +import net.jini.admin.Administrable; +import net.jini.core.lookup.ServiceItem; +import net.jini.core.lookup.ServiceTemplate; +import net.jini.lookup.ServiceDiscoveryManager; +//BTM - FOR_ZOOKEEPER_SMART_PROXY - END + /** * A helper class that starts all the necessary services for a Jini federation. * This is used when testing, but NOT for benchmarking performance. For @@ -509,8 +519,12 @@ clientPort = getPort(2181/* suggestedPort */); final int peerPort = getPort(2888/* suggestedPort */); final int leaderPort = getPort(3888/* suggestedPort */); - final String servers = "1=localhost:" + peerPort + ":" - + leaderPort; +//BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN +//BTM - PRE_ZOOKEEPER_SMART_PROXY final String servers = "1=localhost:" + peerPort + ":" +//BTM - PRE_ZOOKEEPER_SMART_PROXY + leaderPort; + String hostname = NicUtil.getIpAddress("default.nic", "default", true); + final String servers = "1="+hostname+":" + peerPort + ":" + leaderPort; +//BTM - PRE_ZOOKEEPER_SMART_PROXY - END //BTM - FOR_CLIENT_SERVICE - BEGIN //BTM - FOR_CLIENT_SERVICE options = new String[] { @@ -553,8 +567,7 @@ .getInstance(concat(args, options)); // start zookeeper (a server instance). -//BTM log.warn("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER\n"); +System.out.println("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER: args[0] = "+args[0]+"\n"); //BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN //BTM - PRE_ZOOKEEPER_SMART_PROXY final int nstarted = ZookeeperProcessHelper.startZookeeper( //BTM - PRE_ZOOKEEPER_SMART_PROXY config, serviceListener); @@ -563,8 +576,7 @@ (com.bigdata.quorum.ServiceImpl.class, //BTM - was QuorumPeerMain.class config, serviceListener); //BTM - PRE_ZOOKEEPER_SMART_PROXY - END -//BTM log.warn("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER - DONE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER - DONE\n"); +System.out.println("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER - DONE\n"); if (nstarted != 1) { @@ -995,22 +1007,92 @@ //BTM log.warn("\n---------------- JiniServicesHelper.destroy BEGIN DESTROY ----------------\n"); //BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy BEGIN DESTROY ----------------\n"); - ZooKeeper zookeeper = null; - - ZookeeperClientConfig zooConfig = null; - +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN --------------------------------------------------- +//BTM - FOR_ZOOKEEPER_SMART_PROXY ZooKeeper zookeeper = null; +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY ZookeeperClientConfig zooConfig = null; +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY if (client != null && client.isConnected()) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY zooConfig = client.getFederation().getZooConfig(); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY zookeeper = client.getFederation().getZookeeper(); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY client.disconnect(true/* immediateShutdown */); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY client = null; +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY + ServiceItem[] items = null; if (client != null && client.isConnected()) { - zooConfig = client.getFederation().getZooConfig(); - - zookeeper = client.getFederation().getZookeeper(); - + // 1. Clear out everything in zookeeper + ZookeeperClientConfig zooConfig = client.getFederation().getZooConfig(); + ZooKeeper zookeeper = client.getFederation().getZookeeper(); + try { +System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE"); + zookeeper.delete(zooConfig.zroot, -1); // version +System.out.println("---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); + } catch (Exception e) { + log.warn("zroot=" + zooConfig.zroot + " : "+ e.getLocalizedMessage(), e); +System.out.println("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); + } + + //2. For graceful shutdown of QuorumPeerService + ServiceDiscoveryManager sdm = + client.getFederation().getServiceDiscoveryManager(); + Class[] quorumServiceType = + new Class[] {QuorumPeerService.class}; + ServiceTemplate quorumServiceTmpl = + new ServiceTemplate(null, quorumServiceType, null); + items = sdm.lookup(quorumServiceTmpl, Integer.MAX_VALUE, null); + // Graceful shutdown of QuorumPeerService + if (items != null) { + for (int i=0; i<items.length; i++) { + QuorumPeerService zk = (QuorumPeerService)(items[i].service); + try { + Object admin = ((Administrable)zk).getAdmin(); +System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN QuorumPeerService DESTROY"); + ((DestroyAdmin)admin).destroy(); +System.out.println("---------------- JiniServicesHelper.destroy END QuorumPeerService DESTROY\n"); + } catch(Exception e) { + log.warn("failure on zookeeper destroy ["+zk+"]", e); +System.out.println("\n---------------- JiniServicesHelper.destroy END QuorumPeerService DESTROY\n"); + } + } + } else {//items == null + try { +System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN org.apache.zookeeper.server.quorum.QuorumPeerMain KILL"); + ZooHelper.kill(clientPort); +System.out.println("---------------- JiniServicesHelper.destroy END org.apache.zookeeper.server.quorum.QuorumPeerMain KILL\n"); + } catch(Exception e) { + log.warn("failure on zookeeper kill " + +"[clientPort="+clientPort+"]", e); +System.out.println("\n---------------- JiniServicesHelper.destroy END org.apache.zookeeper.server.quorum.QuorumPeerMain KILL\n"); + } + } + + //3. Kill the process(es) in which zookeeper is running + for (ProcessHelper t : ((ServiceListener)serviceListener).running) { + if (t instanceof ZookeeperProcessHelper) { +System.out.println("\n*** KILLING ProcessHelper: "+t+"\n"); + try { + t.kill(true); + } catch(Exception e) { + log.error("exception during process ["+t+"]", e); + } + } + } + + //4. Disconnect client.disconnect(true/* immediateShutdown */); - client = null; - } +//BTM - FOR_ZOOKEEPER_SMART_PROXY - END ----------------------------------------------------- + + //BTM if (metadataServer0 != null) { //BTM //BTM metadataServer0.destroy(); @@ -1109,43 +1191,48 @@ txnService0 = null; } - if (zookeeper != null && zooConfig != null) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN +//BTM - FOR_ZOOKEEPER_SMART_PROXY - Note: moved the shutdown/killing of zookeeper to beginning +//BTM - FOR_ZOOKEEPER_SMART_PROXY - of this method. Performing it here left the zookeeper +//BTM - FOR_ZOOKEEPER_SMART_PROXY - process still running; and when the new smart proxy +//BTM - FOR_ZOOKEEPER_SMART_PROXY - based QuorumPeerService is used instead of the +//BTM - FOR_ZOOKEEPER_SMART_PROXY - org.apache.zookeeper.server.quorum.QuorumPeerMain, +//BTM - FOR_ZOOKEEPER_SMART_PROXY - the process left running by the first test will +//BTM - FOR_ZOOKEEPER_SMART_PROXY - cause all subsequent tests to fail because of +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BindException (port already in use). Destroying +//BTM - FOR_ZOOKEEPER_SMART_PROXY - the service and killing the process, as is done +//BTM - FOR_ZOOKEEPER_SMART_PROXY - above, addresses this issue. +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY if (zookeeper != null && zooConfig != null) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY try { +//BTM - FOR_ZOOKEEPER_SMART_PROXY // clear out everything in zookeeper for this federation. +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY zookeeper.delete(zooConfig.zroot, -1); // version +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY } catch (Exception e) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY // ignore. +//BTM - FOR_ZOOKEEPER_SMART_PROXY log.warn("zroot=" + zooConfig.zroot + " : " +//BTM - FOR_ZOOKEEPER_SMART_PROXY + e.getLocalizedMessage(), e); +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY try { +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+")\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY ZooHelper.kill(clientPort); +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY } catch (Throwable t) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY log.error("Could not kill zookeeper: clientPort=" + clientPort +//BTM - FOR_ZOOKEEPER_SMART_PROXY + " : " + t, t); +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY - END - try { - - // clear out everything in zookeeper for this federation. -//BTM log.warn("\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE\n"); - zookeeper.delete(zooConfig.zroot, -1/* version */); -//BTM log.warn("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); - - } catch (Exception e) { - - // ignore. - log.warn("zroot=" + zooConfig.zroot + " : " - + e.getLocalizedMessage(), e); - - } - - } - - try { - -//BTM log.warn("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+")\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+")\n"); - - ZooHelper.kill(clientPort); - -//BTM log.warn("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); - - } catch (Throwable t) { - - log.error("Could not kill zookeeper: clientPort=" + clientPort - + " : " + t, t); - } - if (zooDataDir != null && zooDataDir.exists()) { /* * Wait a bit and then try and delete the zookeeper directory. Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -349,7 +349,13 @@ } //Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -47,6 +47,7 @@ import com.bigdata.service.proxy.ClientRunnableBuffer; import com.bigdata.util.config.ConfigDeployUtil; import com.bigdata.util.config.LogUtil; +import com.bigdata.util.config.NicUtil; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -541,7 +542,6 @@ throw new NullPointerException("null entryName"); } Exporter exporter = null; - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = getExporter(defaultEnableDgc, defaultKeepAlive); @@ -561,8 +561,17 @@ boolean keepAlive) { - Exporter exporter = null; - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + ServerEndpoint endpoint = null; + try { + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + endpoint = TcpServerEndpoint.getInstance(exportIpAddr, 0); + } catch(Exception e) { + endpoint = TcpServerEndpoint.getInstance(0); + } InvocationLayerFactory ilFactory = new BasicILFactory(); return new BasicJeriExporter (endpoint, ilFactory, enableDgc, keepAlive); Modified: branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config =================================================================== --- branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config 2010-12-09 22:58:55 UTC (rev 4001) @@ -152,7 +152,8 @@ * * Note: The default policy is completely open. */ - private static policy = "policy.all"; +//BTM private static policy = "policy.all"; + private static policy = ConfigMath.getAbsolutePath(new File("policy.all")); /** * Where jini is installed. Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -154,8 +154,7 @@ // if necessary, start zookeeper (a server instance). //BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN //BTM - PRE_ZOOKEEPER_SMART_PROXY ZookeeperProcessHelper.startZookeeper(config, listener); - ZookeeperProcessHelper.startZookeeper(com.bigdata.quorum.ServiceImpl.class, config, listener); -//ZookeeperProcessHelper.startZookeeper(org.apache.zookeeper.server.quorum.QuorumPeerMain.class, config, listener); + ZookeeperProcessHelper.startZookeeper(com.bigdata.quorum.ServiceImpl.class, config, listener);//was QuorumPeerMain.class //BTM - PRE_ZOOKEEPER_SMART_PROXY - END /* Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -155,8 +155,9 @@ try { helper = new JiniServicesHelper(args, serviceImplRemote); - +System.out.println("\n\n------------------------------------- TestMappedRDFDataLoadMaster helper.start BEGIN --------\n"); helper.start(); +System.out.println("\n\n------------------------------------- TestMappedRDFDataLoadMaster helper.start END_1 --------\n"); //BTM - PRE_CLIENT_SERVICE - BEGIN //BTM - PRE_CLIENT_SERVICE new MappedRDFDataLoadMaster(helper.getFederation()).execute(); @@ -172,6 +173,7 @@ //BTM - PRE_CLIENT_SERVICE - END } finally { +System.out.println("\n\n------------------------------------- TestMappedRDFDataLoadMaster helper.start END_2 --------\n"); // delete the temp file containing the federation configuration. tempConfigFile.delete(); Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -60,6 +60,7 @@ //BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN import com.bigdata.service.QuorumPeerService; +import com.bigdata.util.config.NicUtil; import com.sun.jini.admin.DestroyAdmin; import net.jini.admin.Administrable; import net.jini.core.discovery.LookupLocator; @@ -176,7 +177,7 @@ final int leaderPort = getPort(3888/* suggestedPort */); //BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN //BTM - PRE_ZOOKEEPER_SMART_PROXY final String servers = "1=localhost:" + peerPort + ":" + leaderPort; - hostname = com.bigdata.util.config.NicUtil.getIpAddress("default.nic", "default", true); + hostname = NicUtil.getIpAddress("default.nic", "default", true); final String servers = "1="+hostname+":" + peerPort + ":" + leaderPort; //BTM - PRE_ZOOKEEPER_SMART_PROXY - END This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |