From: <btm...@us...> - 2010-11-14 16:46:02
|
Revision: 3948 http://bigdata.svn.sourceforge.net/bigdata/?rev=3948&view=rev Author: btmurphy Date: 2010-11-14 16:45:55 +0000 (Sun, 14 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - phase 1 of callable executor (client service) smart proxy work. Added code to deal with the new asynchronous initializaztion of the shard (data) service; that is, code that tests to determine whether the shard service is not ready because it's still initializing, and then performs a set of retries to allow the shard service to complete its startup processing Modified Paths: -------------- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -1338,10 +1338,10 @@ /* * Note: By setting this to null we are indicating to * the RegisterIndexTask on the data service that it - * needs to set the resourceMetadata[] when the index is - * actually registered based on the live journal as of - * the when the task actually executes on the data - * service. + * needs to set the resourceMetadata[] when the index + * is actually registered based on the live journal + * as of the when the task actually executes on the + * data service. */ null,//[resources] Signal to the RegisterIndexTask. null //[cause] Signal to RegisterIndexTask @@ -1350,11 +1350,53 @@ // */ // ,"createScaleOutIndex(name="+scaleOutIndexName+") " )); - - dataServices[i].registerIndex - (Util.getIndexPartitionName(scaleOutIndexName, - pmd.getPartitionId()), - md); + + // The shard service (as currently implemented) may not be + // completely initialized if it is just being started + // when this method is called (for example, in a test + // environment). This is because the shard service creates + // a StoreManager (via a Resource), which depends on + // discovering a transaction service; and it sets up + // counters, which depend on discovering a load balancer. + // Thus, to address the case where the shard service is + // not yet ready, test for such a situation; and apply a + // retry-to-failure strategy + boolean registered = false; + try { + dataServices[i].registerIndex + ( Util.getIndexPartitionName(scaleOutIndexName, + pmd.getPartitionId()), + md ); + registered = true; + } catch(Throwable t1) { + if ( !Util.causeNoSuchObject(t1) ) { + throw new Exception(t1); + } + //wait for data service to finish initializing + int nWait = 5; + for(int n=0; n<nWait; n++) { + Util.delayMS(1000L); + try { + dataServices[i].registerIndex + ( Util.getIndexPartitionName + (scaleOutIndexName, + pmd.getPartitionId()), + md ); + registered = true; + break; + } catch(Throwable t2) { + if ( !Util.causeNoSuchObject(t2) ) { + throw new Exception(t2); + } + } + } + } + if (!registered) {// try one last time + dataServices[i].registerIndex + ( Util.getIndexPartitionName(scaleOutIndexName, + pmd.getPartitionId()), + md ); + } partitions[i] = pmd; } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -273,8 +273,8 @@ ((this.localResources).getScheduledExecutor()) .scheduleWithFixedDelay (deferredInitTask, - 20L*1000L,//initial delay - 30L*1000L,//period + 1L*1000L,//initial delay + 3L*1000L,//period TimeUnit.MILLISECONDS); } } @@ -318,7 +318,9 @@ * not complete within a timeout. */ public boolean isOpen() { - return ( (concurrencyMgr != null) && (concurrencyMgr.isOpen()) ); + return ( (concurrencyMgr != null) && + (concurrencyMgr.isOpen()) && + deferredInitDone ); } synchronized public void shutdown() { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -86,6 +86,7 @@ import java.io.File; import java.io.IOException; +import java.rmi.NoSuchObjectException; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.Arrays; @@ -188,7 +189,7 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); embeddedShardService.registerIndex(name, metadata); } @@ -196,14 +197,14 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); embeddedShardService.dropIndex(name); } public IBlock readBlock(IResourceMetadata resource, long addr) throws RemoteException, IOException { - readyState.check(); + readyCheck(); return embeddedShardService.readBlock(resource, addr); } @@ -211,7 +212,7 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); return embeddedShardService.getIndexMetadata(name, timestamp); } @@ -225,7 +226,7 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); return embeddedShardService.rangeIterator (tx, name, fromKey, toKey, capacity, flags, filter); } @@ -234,7 +235,7 @@ public <T> Future<T> submit(IDataServiceCallable<T> task) throws RemoteException { - readyState.check(); + readyCheck(); Exporter exporter = null; try { exporter = Util.getExporter(config, @@ -256,7 +257,7 @@ public Future submit(long tx, String name, IIndexProcedure proc) throws RemoteException { - readyState.check(); + readyCheck(); Exporter exporter = null; try { exporter = Util.getExporter(config, @@ -279,19 +280,19 @@ public boolean purgeOldResources(long timeout, boolean truncateJournal) throws RemoteException, InterruptedException { - readyState.check(); + readyCheck(); return embeddedShardService.purgeOldResources(timeout,truncateJournal); } public void setReleaseTime(long releaseTime) throws RemoteException, IOException { - readyState.check(); + readyCheck(); embeddedShardService.setReleaseTime(releaseTime); } public void abort(long tx) throws RemoteException, IOException { - readyState.check(); + readyCheck(); embeddedShardService.abort(tx); } @@ -299,14 +300,14 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); return embeddedShardService.singlePhaseCommit(tx); } public void prepare(long tx, long revisionTime) throws RemoteException, IOException, Throwable { - readyState.check(); + readyCheck(); embeddedShardService.prepare(tx, revisionTime); } @@ -314,19 +315,19 @@ throws RemoteException, IOException, InterruptedException, ExecutionException { - readyState.check(); + readyCheck(); embeddedShardService.forceOverflow(immediate, compactingMerge); } public long getAsynchronousOverflowCounter() throws RemoteException, IOException { - readyState.check(); + readyCheck(); return embeddedShardService.getAsynchronousOverflowCounter(); } public boolean isOverflowActive() throws RemoteException, IOException { - readyState.check(); + readyCheck(); return embeddedShardService.isOverflowActive(); } @@ -668,10 +669,40 @@ readyState.ready();//ready to accept calls from clients } + // Private methods + + // Throws NoSuchObjectException if the service has either + // not completed initialization and registration with the + // lookup service, or not created the concurrency manager + // and resource (which depends on discovering the transaction + // service) + private void readyCheck() { + readyState.check();//completed service init? + + // created concurrency and resource managers? + if ( !embeddedShardService.isOpen() ) { + throw new RemoteExceptionWrapper + (new NoSuchObjectException("not ready")); + } + } + private void shutdownDo(ShutdownType type) { (new ShutdownThread(type)).start(); } + private void killDo(int status) { + String[] groups = ((DiscoveryGroupManagement)ldm).getGroups(); + LookupLocator[] locs = ((DiscoveryLocatorManagement)ldm).getLocators(); + logger.log(Level.INFO, killStr+" [groups=" + +Util.writeGroupArrayToString(groupsToJoin) + +", locators=" + +Util.writeArrayElementsToString(locatorsToJoin)+"]"); + + System.exit(status); + } + + // Nested classes + /** * Used to shutdown the service asynchronously. */ @@ -745,15 +776,15 @@ } } - private void killDo(int status) { - String[] groups = ((DiscoveryGroupManagement)ldm).getGroups(); - LookupLocator[] locs = ((DiscoveryLocatorManagement)ldm).getLocators(); - logger.log(Level.INFO, killStr+" [groups=" - +Util.writeGroupArrayToString(groupsToJoin) - +", locators=" - +Util.writeArrayElementsToString(locatorsToJoin)+"]"); - - System.exit(status); + private static class RemoteExceptionWrapper extends RuntimeException { + private static final long serialVersionUID = 1L; + private final RemoteException wrapped; + public RemoteExceptionWrapper(RemoteException wrapped) { + this.wrapped = wrapped; + } + private Object writeReplace() { + return wrapped; + } } /** Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -73,6 +73,7 @@ import net.jini.lookup.ServiceDiscoveryManager; import java.io.IOException; +import java.rmi.NoSuchObjectException; import java.rmi.Remote; import java.rmi.server.ExportException; import java.util.Collection; @@ -110,6 +111,26 @@ return min; } + public static void delayMS(long nMS) { + try { + Thread.sleep(nMS); + } catch (InterruptedException e) { } + } + + public static boolean causeNoSuchObject(Throwable t) { + if (t instanceof NoSuchObjectException) return true; + + // test cause chain for NoSuchObjectException + Throwable cause = t.getCause(); + while ( (cause != null) && + !(cause instanceof NoSuchObjectException) ) + { + cause = cause.getCause(); + } + if (cause == null) return false; + return (cause instanceof NoSuchObjectException); + } + /* Convenience method that can be called when a service exits, or * when failure occurs during the service's initialization process. * This method un-does any work that may have already been completed; Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java 2010-11-12 21:29:20 UTC (rev 3947) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java 2010-11-14 16:45:55 UTC (rev 3948) @@ -132,7 +132,33 @@ metadata.setDeleteMarkers(true); - fed.registerIndex(metadata); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE fed.registerIndex(metadata); + boolean registered = false; + try { + fed.registerIndex(metadata); + registered = true; + } catch(Throwable t1) { + if ( !Util.causeNoSuchObject(t1) ) { + throw new Exception(t1); + } + //wait for data service to finish initializing + int nWait = 5; + for(int i=0; i<nWait; i++) { + Util.delayMS(1000L); + try { + fed.registerIndex(metadata); + registered = true; + break; + } catch(Throwable t2) { + if ( !Util.causeNoSuchObject(t2) ) { + throw new Exception(t2); + } + } + } + } + assertTrue("failed to register metadata", registered); +//BTM - PRE_CLIENT_SERVICE - END final IIndex ndx = fed.getIndex(name, ITx.UNISOLATED); @@ -166,9 +192,7 @@ if (dataService0 == null) { for(int i=0; i<nWait; i++) { - try { - Thread.sleep(1L*1000L); - } catch (InterruptedException e) { } + Util.delayMS(1000L); dataService0 = helper.getDataService0(); if (dataService0 != null) break; } @@ -182,9 +206,7 @@ if (dataService1 == null) { for(int i=0; i<nWait; i++) { - try { - Thread.sleep(1L*1000L); - } catch (InterruptedException e) { } + Util.delayMS(1000L); dataService1 = helper.getDataService1(); if (dataService1 != null) break; } @@ -194,19 +216,60 @@ } //BTM - END --------------------------------------------------- - final UUID indexUUID = fed.registerIndex( metadata, // - // separator keys. - new byte[][] { - new byte[]{}, - TestKeyBuilder.asSortKey(500) - },// - // data service assignments. - new UUID[] { // -//BTM helper.getDataService0().getServiceUUID(),// -//BTM helper.getDataService1().getServiceUUID() // -dataService0UUID, -dataService1UUID - }); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final UUID indexUUID = fed.registerIndex( metadata, // +//BTM - PRE_CLIENT_SERVICE // separator keys. +//BTM - PRE_CLIENT_SERVICE new byte[][] { +//BTM - PRE_CLIENT_SERVICE new byte[]{}, +//BTM - PRE_CLIENT_SERVICE TestKeyBuilder.asSortKey(500) +//BTM - PRE_CLIENT_SERVICE },// +//BTM - PRE_CLIENT_SERVICE // data service assignments. +//BTM - PRE_CLIENT_SERVICE new UUID[] { // +//BTM - PRE_CLIENT_SERVICE dataService0UUID, +//BTM - PRE_CLIENT_SERVICE dataService1UUID +//BTM - PRE_CLIENT_SERVICE }); + UUID indexUUID = null; + boolean registered = false; + try { + indexUUID = + fed.registerIndex + ( metadata, + new byte[][] + { new byte[]{}, + TestKeyBuilder.asSortKey(500) }, + new UUID[] { dataService0UUID, + dataService1UUID } + ); + registered = true; + } catch(Throwable t1) { + if ( !Util.causeNoSuchObject(t1) ) { + throw new Exception(t1); + } + //wait for data service to finish initializing + nWait = 5; + for(int i=0; i<nWait; i++) { + Util.delayMS(1000L); + try { + indexUUID = + fed.registerIndex + ( metadata, + new byte[][] + { new byte[]{}, + TestKeyBuilder.asSortKey(500) }, + new UUID[] { dataService0UUID, + dataService1UUID } + ); + registered = true; + break; + } catch(Throwable t2) { + if ( !Util.causeNoSuchObject(t2) ) { + throw new Exception(t2); + } + } + } + } + assertTrue("failed to register metadata", registered); +//BTM - PRE_CLIENT_SERVICE - END final IIndex ndx = fed.getIndex(name, ITx.UNISOLATED); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |