From: <btm...@us...> - 2010-11-11 22:00:49
|
Revision: 3937 http://bigdata.svn.sourceforge.net/bigdata/?rev=3937&view=rev Author: btmurphy Date: 2010-11-11 22:00:41 +0000 (Thu, 11 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - completed phase 1 of callable executor (client) service smart proxy work. Made changes to allow smart proxy and/or remote service implementations to be started by ServicesManagerService, service re-ordering in RestartPersistentServices because of dependency on txn service and load balancer, added DeferredInitTask to EmbeddedShardService, added the necessary jar files to classpath of shard locator and callable executor services in boot-processes.xml Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/dev-btm/bigdata/src/resources/logging/log4j.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/boot/config/boot-processes.xml branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/RestartPersistentServices.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/ServiceConfigurationZNodeMonitorTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ServiceConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.config branches/dev-btm/src/resources/config/bigdataCluster.config Modified: branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-11-11 22:00:41 UTC (rev 3937) @@ -1523,6 +1523,10 @@ // Wait no more than N seconds for discovery int nWait = 120; boolean discoveredTxnSrvc = false; + if (log.isDebugEnabled()) { + log.debug("waiting for transaction " + +"service discovery ..."); + } for(int i=0; i<nWait; i++) { if (discoveryMgr.getTransactionService() != null) { discoveredTxnSrvc = true; @@ -1531,24 +1535,19 @@ try { Thread.sleep(1000L); } catch(InterruptedException ie) { } + } + if(discoveredTxnSrvc) { if (log.isDebugEnabled()) { - log.debug("waiting for transaction " - +"service discovery"); + log.debug("StoreManager - discovered " + +"transaction service"); } - if(discoveredTxnSrvc) { - if (log.isDebugEnabled()) { - log.debug("discovered transaction service"); - } - } else { - log.warn("transaction service unreachable"); - }//endif(discoveredTxnSrvc) - }//endloop(nWait) -//BTM if(discoveredTxnSrvc) { -//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> TRANSACTION SERVICE DISCOVERED"); -//BTM }else{ -//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> TRANSACTION SERVICE UNREACHABLE\n"); -//BTM } +log.warn("DISCOVERED TRANSACTION SERVICE"); + } else { + log.warn("StoreManager - transaction " + +"service unreachable"); + } + } catch (UnsupportedOperationException ex) { //BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> FEDERATION UNAVAILABLE - test case?\n"); log.warn("Federation not available - running in test case?"); Modified: branches/dev-btm/bigdata/src/resources/logging/log4j.properties =================================================================== --- branches/dev-btm/bigdata/src/resources/logging/log4j.properties 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata/src/resources/logging/log4j.properties 2010-11-11 22:00:41 UTC (rev 3937) @@ -215,3 +215,4 @@ log4j.logger.com.bigdata.transaction=DEBUG log4j.logger.com.bigdata.metadata=DEBUG log4j.logger.com.bigdata.shard=DEBUG +log4j.logger.com.bigdata.executor=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/boot/config/boot-processes.xml =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/boot/config/boot-processes.xml 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/boot/config/boot-processes.xml 2010-11-11 22:00:41 UTC (rev 3937) @@ -77,7 +77,7 @@ <javaprop name="java.util.logging.config.file" value="${bigdata.configDir}/logging/logging.properties"/> - <property name="java.classpath" value="${bootLauncherClasspath}${:}lib/fastutil.jar${:}lib/dsiutils.jar${:}lib/cweb-extser.jar"/> + <property name="java.classpath" value="${bootLauncherClasspath}${:}lib/fastutil.jar${:}lib/dsiutils.jar${:}lib/cweb-extser.jar${:}lib/ctc_utils.jar"/> <property name="java.app.mainclass" value="com.bigdata.boot.starter.SingleNonActivatableServiceStarter"/> <arg value="${bigdata.configDir}/policy/service.policy"/> @@ -113,7 +113,7 @@ <javaprop name="java.util.logging.config.file" value="${bigdata.configDir}/logging/logging.properties"/> - <property name="java.classpath" value="${bootLauncherClasspath}"/> + <property name="java.classpath" value="${bootLauncherClasspath}${:}lib/fastutil.jar${:}lib/dsiutils.jar${:}lib/cweb-extser.jar${:}lib/ctc_utils.jar"/> <property name="java.app.mainclass" value="com.bigdata.boot.starter.SingleNonActivatableServiceStarter"/> <arg value="${bigdata.configDir}/policy/service.policy"/> Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config 2010-11-11 22:00:41 UTC (rev 3937) @@ -11,6 +11,10 @@ import net.jini.core.discovery.LookupLocator; import net.jini.discovery.LookupDiscoveryManager; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + import com.bigdata.util.config.NicUtil; import com.bigdata.util.config.ConfigDeployUtil; @@ -106,3 +110,15 @@ (com.bigdata.executor.serverExporterTcpServerEndpoint, com.bigdata.executor.serverILFactory, false, false); } + +//NOTE: remove once dynamic discovery of zookeeper is added +org.apache.zookeeper.ZooKeeper { + + zroot = ConfigDeployUtil.getString("federation.name"); + + servers = com.bigdata.executor.serverExporterIpAddr+":2888:3888"; + + acl = new ACL[] { + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + }; +} Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties 2010-11-11 22:00:41 UTC (rev 3937) @@ -37,5 +37,4 @@ #log4j.logger.com.bigdata.executor=DEBUG #log4j.logger.com.bigdata.executor.EmbeddedCallableExecutor=DEBUG -#log4j.logger.com.bigdata.executor.EmbeddedClientIndexStore=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/RestartPersistentServices.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/RestartPersistentServices.java 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/RestartPersistentServices.java 2010-11-11 22:00:41 UTC (rev 3937) @@ -19,6 +19,9 @@ import com.bigdata.service.jini.JiniFederation; import com.bigdata.util.InnerCause; +//BTM - FOR_CLIENT_SERVICE +import java.util.ArrayList; + /** * Task restarts persistent physical services that should be running on this * host but which are not discoverable using jini (not found when we query for @@ -154,7 +157,8 @@ + BigdataZooDefs.CONFIG; // these are the ServiceConfigurations. - final List<String> serviceConfigZNodes; +//BTM - PRE_CLIENT_SERVICE final List<String> serviceConfigZNodes; + List<String> serviceConfigZNodes; try { serviceConfigZNodes = zookeeper.getChildren(zconfig, false); @@ -171,8 +175,109 @@ if (log.isInfoEnabled()) log.info("Considering " + serviceConfigZNodes.size() + " service configurations"); -System.out.println("\n*********************************************************"); -System.out.println("*** RestartPersistentServices.runOnce: Considering " + serviceConfigZNodes.size()+ " service configurations"); +System.out.println("\n*** RestartPersistentServices - BEGIN *********************************************************\n"); +for (String serviceConfigZNode : serviceConfigZNodes) { + System.out.println("*** RestartPersistentServices.runOnce: serviceConfigZNode = "+ serviceConfigZNode); +} +System.out.println("\n*** RestartPersistentServices.runOnce: RE-ORDER - DATA SERVICE LAST\n"); +//BTM - FOR_CLIENT_SERVICE - BEGIN --------------------------------------- + // re-order because shard (data) service waits for transaction + // service and load balancer service + List<String> tmpList = new ArrayList<String>(); + + //1. transaction service(s) + for (String serviceConfigZNode : serviceConfigZNodes) { + if ( (serviceConfigZNode.equals + ("com.bigdata.transaction.ServiceImpl")) || + (serviceConfigZNode.equals + ("com.bigdata.service.jini.TransactionServer")) ) + { + tmpList.add(serviceConfigZNode); + } + } + //2. load balancer service(s) + for (String serviceConfigZNode : serviceConfigZNodes) { + if ( serviceConfigZNode.equals + ("com.bigdata.loadbalancer.ServiceImpl") || + (serviceConfigZNode.equals + ("com.bigdata.service.jini.LoadBalancerServer")) ) + { + tmpList.add(serviceConfigZNode); + } + } + //3. shard locator (metadata) service(s) + for (String serviceConfigZNode : serviceConfigZNodes) { + if ( serviceConfigZNode.equals + ("com.bigdata.metadata.ServiceImpl") || + (serviceConfigZNode.equals + ("com.bigdata.service.jini.MetadataServer")) ) + { + tmpList.add(serviceConfigZNode); + } + } + //4. callable executor (client) service(s) + for (String serviceConfigZNode : serviceConfigZNodes) { + if ( serviceConfigZNode.equals + ("com.bigdata.executor.ServiceImpl") || + (serviceConfigZNode.equals + ("com.bigdata.service.jini.ClientServer")) ) + { + tmpList.add(serviceConfigZNode); + } + } + //5. shard (data) service(s) + for (String serviceConfigZNode : serviceConfigZNodes) { + if ( serviceConfigZNode.equals + ("com.bigdata.shard.ServiceImpl") || + (serviceConfigZNode.equals + ("com.bigdata.service.jini.DataServer")) ) + { + tmpList.add(serviceConfigZNode); + } + } + //6. add anything that's none of the above + for (String serviceConfigZNode : serviceConfigZNodes) { + + if ( !(serviceConfigZNode.equals + ("com.bigdata.transaction.ServiceImpl")) && + !(serviceConfigZNode.equals + ("com.bigdata.service.jini.TransactionServer")) && + + !(serviceConfigZNode.equals + ("com.bigdata.loadbalancer.ServiceImpl")) && + !(serviceConfigZNode.equals + ("com.bigdata.service.jini.LoadBalancerServer")) && + + !(serviceConfigZNode.equals + ("com.bigdata.metadata.ServiceImpl")) && + !(serviceConfigZNode.equals + ("com.bigdata.service.jini.MetadataServer")) && + + !(serviceConfigZNode.equals + ("com.bigdata.executor.ServiceImpl")) && + !(serviceConfigZNode.equals + ("com.bigdata.service.jini.ClientServer")) && + + !(serviceConfigZNode.equals + ("com.bigdata.shard.ServiceImpl")) && + !(serviceConfigZNode.equals + ("com.bigdata.service.jini.DataServer")) ) + { + tmpList.add(serviceConfigZNode); + } + } + if (tmpList.size() == serviceConfigZNodes.size()) { + serviceConfigZNodes = tmpList; + } else { + log.warn("reordered list size ["+tmpList.size() + +"] != serviceConfigZNodes size [" + +serviceConfigZNodes.size()+"]"); + } +System.out.println("*** RestartPersistentServices.runOnce: Considering " + serviceConfigZNodes.size()+ " service configurations\n"); +for (String serviceConfigZNode : serviceConfigZNodes) { + System.out.println("*** RestartPersistentServices.runOnce AFTER RE-ORDER: serviceConfigZNode = "+ serviceConfigZNode); +} +//BTM - FOR_CLIENT_SERVICE - END ------------------------------------- for (String serviceConfigZNode : serviceConfigZNodes) { @@ -253,7 +358,7 @@ } } -System.out.println("*********************************************************\n"); +System.out.println("\n*** RestartPersistentServices - END *********************************************************\n"); // Success. return true; Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/ServiceConfigurationZNodeMonitorTask.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/ServiceConfigurationZNodeMonitorTask.java 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/ServiceConfigurationZNodeMonitorTask.java 2010-11-11 22:00:41 UTC (rev 3937) @@ -323,7 +323,7 @@ if (config.serviceCount != children.size()) { // adjust the #of logical service instances (blocks). -System.out.println("GGGG ServiceConfigurationZNodeMonitorTask.runWithLock: *** NEW LOGICAL SERVICE TASK"); +System.out.println("GGGG ServiceConfigurationZNodeMonitorTask.runWithLock: *** NEW LOGICAL SERVICE TASK >>> "+serviceConfigZPath); config.newLogicalServiceTask(fed, listener, serviceConfigZPath, children).call(); Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ServiceConfiguration.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ServiceConfiguration.java 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ServiceConfiguration.java 2010-11-11 22:00:41 UTC (rev 3937) @@ -494,7 +494,6 @@ * if the service detectably did not start. */ public V call() throws Exception { -System.out.println("*** ServiceConfiguration: call() [className="+className+"]"); if (log.isInfoEnabled()) log.info("config: " + this); @@ -532,7 +531,9 @@ TimeUnit.MILLISECONDS); // attempt to detect a service start failure. +System.out.println("\n*** ServiceConfiguration: call() [className="+className+"] >>> awaitServiceStart [timeout="+timeout+" ms] - BEGIN\n"); awaitServiceStart(processHelper, timeout, TimeUnit.MILLISECONDS); +System.out.println("\n*** ServiceConfiguration: call() [className="+className+"] >>> awaitServiceStart [timeout="+timeout+" ms] - END\n"); } catch (InterruptedException ex) { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties 2010-11-11 22:00:41 UTC (rev 3937) @@ -37,5 +37,4 @@ #log4j.logger.com.bigdata.metadata=DEBUG #log4j.logger.com.bigdata.metadata.EmbeddedShardLocator=DEBUG -#log4j.logger.com.bigdata.journal.EmbeddedIndexStore=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config 2010-11-11 22:00:41 UTC (rev 3937) @@ -11,6 +11,10 @@ import net.jini.core.discovery.LookupLocator; import net.jini.discovery.LookupDiscoveryManager; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + import com.bigdata.util.config.NicUtil; import com.bigdata.util.config.ConfigDeployUtil; @@ -89,3 +93,15 @@ (com.bigdata.metadata.serverExporterTcpServerEndpoint, com.bigdata.metadata.serverILFactory, false, false); } + +//NOTE: remove once dynamic discovery of zookeeper is added +org.apache.zookeeper.ZooKeeper { + + zroot = ConfigDeployUtil.getString("federation.name"); + + servers = com.bigdata.metadata.serverExporterIpAddr+":2888:3888"; + + acl = new ACL[] { + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + }; +} Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java 2010-11-11 22:00:41 UTC (rev 3937) @@ -156,12 +156,14 @@ //BTM - BEGIN - fields from AbstractFederation ------------------------------- private ScheduledFuture<?> eventTaskFuture; private ScheduledFuture<?> lbsReportingTaskFuture; + private long lbsReportingPeriod; private AbstractHTTPD httpServer; private String httpServerUrl;//URL used to access the httpServer private int httpdPort; //BTM - END - fields from AbstractFederation ------------------------------- -String dbgFlnm="EmbeddedShardService.out"; + private ScheduledFuture deferredInitTaskFuture; + private boolean deferredInitDone = false; protected EmbeddedShardService (final UUID serviceUUID, @@ -236,116 +238,45 @@ maxStaleLocatorRetries, logger, this.properties); - String httpServerPath = - (this.localResources).getServiceCounterPathPrefix(); - try { - this.httpServerUrl = - "http://" - +AbstractStatisticsCollector.fullyQualifiedHostName - +":"+this.httpdPort+"/?path=" - +URLEncoder.encode(httpServerPath, "UTF-8"); - } catch(java.io.UnsupportedEncodingException e) { - logger.warn("failed to initialize httpServerUrl", e); - } -System.out.println("\nEmbeddedShardService >>> NEW StoreManager - BEGIN"); - this.resourceMgr = - new ShardResourceManager(this, - this.discoveryMgr, - this.localResources, - this.indexMgr, - this.properties); -System.out.println("\nEmbeddedShardService >>> NEW StoreManager - END"); + this.lbsReportingPeriod = lbsReportingPeriod; - this.localTransactionMgr = new LocalTransactionManager(discoveryMgr); - this.concurrencyMgr = - new ConcurrencyManager(this.properties, - this.localTransactionMgr, - this.resourceMgr); - //WARN: circular refs - (this.resourceMgr).setConcurrencyManager(this.concurrencyMgr); - (this.indexMgr).setConcurrencyManager(this.concurrencyMgr); + // Note that this service employs a StoreManager (in the + // ResourceManager) whose creation depends on the existence + // of a transaction service. Additionally, this service + // also employs counters dependent on the existence of + // a load balancer to which the counters send events. + // Unfortunately, when the ServicesManagerService is used + // to start this service, these dependencies can cause + // problems for the ServicesManagerService. This is because + // the order the services are started by the ServicesManagerService + // can be random, and if this service is the first service + // the ServicesManagerService attempts to start (or whose + // starting is attempted before the transaction service + // and/or the load balancer), then unless this service + // returns an indication to the ServicesManagerService that + // it has successfully started within the time period + // expected, the ServicesManagerService will declare that + // this service is faulty and kill the process in which + // this service was started. To address this issue, this + // service executes an instance of DeferredInitTask to + // create the ResourceManager and set up the counters and + // events asynchronously; which allows the transaction + // service and load balancer to be started and discovered + // after this service has been started by the + // ServicesManagerService. -//BTM - from AbstractFederation constructor and addScheduledTask - - //start event queue/sender task (sends events every 2 secs) - - long sendEventsDelay = 100L;//one-time initial delay - long sendEventsPeriod = 2000L; - this.eventTaskFuture = - (localResources.getScheduledExecutor()).scheduleWithFixedDelay - (localResources.getEventQueueSender(), - sendEventsDelay, - sendEventsPeriod, - TimeUnit.MILLISECONDS); - -//BTM - from AbstractFederation - start deferred tasks - - //start task to report counters to the load balancer - - LoadBalancerReportingTask lbsReportingTask = - new LoadBalancerReportingTask(this.resourceMgr, - this.concurrencyMgr, - this.localResources, - this.discoveryMgr, - logger); - this.lbsReportingTaskFuture = - ((this.localResources).getScheduledExecutor()) + deferredInitDone = deferredInit(); + if (!deferredInitDone) { + DeferredInitTask deferredInitTask = new DeferredInitTask(); + this.deferredInitTaskFuture = + ((this.localResources).getScheduledExecutor()) .scheduleWithFixedDelay - (lbsReportingTask, - lbsReportingPeriod,//initial delay - lbsReportingPeriod, + (deferredInitTask, + 20L*1000L,//initial delay + 30L*1000L,//period TimeUnit.MILLISECONDS); - - //start an http daemon from which interested parties can query - //counter and/or statistics information with http get commands - - try { - httpServer = - new HttpReportingServer(this.httpdPort, - this.resourceMgr, - this.concurrencyMgr, - this.localResources, - logger); - } catch (IOException e) { - logger.error("failed to start http server " - +"[port="+this.httpdPort - +", path="+httpServerPath+"]", e); - return; } - if(httpServer != null) { - if( logger.isDebugEnabled() ) { - logger.debug("started http daemon " - +"[access URL="+this.httpServerUrl+"]"); - } - // add counter reporting the access url to load balancer - ((this.localResources).getServiceCounterSet()) - .addCounter - (IServiceCounters.LOCAL_HTTPD, - new OneShotInstrument<String>(this.httpServerUrl)); - } - -//BTM - BEGIN ScaleOutIndexManager -//BTM - The call to embeddedIndexStore.didStart was previously -//BTM - commented out during the data service conversion. But -//BTM - the method didStart() on the original EmbeddedIndexStore and -//BTM - AbstractFederation calls the private method setupCounters; -//BTM - which seems to be important for at least the shard (data) -//BTM - service. The tests still passed without calling that method, -//BTM - but we should consider calling it at this point (the problem -//BTM - is that it waits on the resource manager to finish -//BTM - initializing, which waits on the transaction service to be -//BTM - discovered). Setting up these counters seem to be important -//BTM - only for the shard (data) service rather than the other -//BTM - services. So we should consider adding setupCounters to this -//BTM - class, and calling it here instead of calling -//BTM - embeddedIndexStore.didStart() or AbstractFederation.didStart(). -//BTM - -//BTM embeddedIndexStore.didStart(); - - setupCounters(); - -//BTM - END ScaleOutIndexManager } // Required by Service interface @@ -394,6 +325,11 @@ logger.warn("SSSS SHARD SERVICE EmbeddedShardService.shutdown"); if (!isOpen()) return; + //false ==> allow in-progress tasks to complete + if (deferredInitTaskFuture != null) { + deferredInitTaskFuture.cancel(false); + } + if (concurrencyMgr != null) { concurrencyMgr.shutdown(); } @@ -404,9 +340,12 @@ resourceMgr.shutdown(); } - //false ==> allow in-progress tasks to complete - lbsReportingTaskFuture.cancel(false); - eventTaskFuture.cancel(false); + if (lbsReportingTaskFuture != null) { + lbsReportingTaskFuture.cancel(false); + } + if (eventTaskFuture != null) { + eventTaskFuture.cancel(false); + } if (indexMgr != null) indexMgr.destroy(); if (localResources != null) { @@ -1026,6 +965,119 @@ return new File(resourceMgr.getDataDir(), "httpd.url"); } + private boolean deferredInit() { + + // StoreManager depends on the transaction service + if (discoveryMgr.getTransactionService() == null) return false; + + if (this.resourceMgr == null) { +System.out.println("\nEmbeddedShardService >>> NEW StoreManager - BEGIN"); + this.resourceMgr = + new ShardResourceManager(this, + this.discoveryMgr, + this.localResources, + this.indexMgr, + this.properties); +System.out.println("\nEmbeddedShardService >>> NEW StoreManager - END"); + + this.localTransactionMgr = + new LocalTransactionManager(discoveryMgr); + this.concurrencyMgr = + new ConcurrencyManager(this.properties, + this.localTransactionMgr, + this.resourceMgr); + //WARN: circular refs + (this.resourceMgr).setConcurrencyManager(this.concurrencyMgr); + (this.indexMgr).setConcurrencyManager(this.concurrencyMgr); + } + + // Events and counters depend on the load balancer + if (discoveryMgr.getLoadBalancerService() == null) return false; + +//BTM - from AbstractFederation - start deferred tasks + + //start task to report counters to the load balancer + + LoadBalancerReportingTask lbsReportingTask = + new LoadBalancerReportingTask(this.resourceMgr, + this.concurrencyMgr, + this.localResources, + this.discoveryMgr, + logger); + this.lbsReportingTaskFuture = + ((this.localResources).getScheduledExecutor()) + .scheduleWithFixedDelay + (lbsReportingTask, + lbsReportingPeriod,//initial delay + lbsReportingPeriod, + TimeUnit.MILLISECONDS); + + //start an http daemon from which interested parties can query + //counter and/or statistics information with http get commands + + String httpServerPath = + (this.localResources).getServiceCounterPathPrefix(); + try { + this.httpServerUrl = + "http://" + +AbstractStatisticsCollector.fullyQualifiedHostName + +":"+this.httpdPort+"/?path=" + +URLEncoder.encode(httpServerPath, "UTF-8"); + } catch(java.io.UnsupportedEncodingException e) { + logger.warn("failed to initialize httpServerUrl", e); + } + + + try { + httpServer = + new HttpReportingServer(this.httpdPort, + this.resourceMgr, + this.concurrencyMgr, + this.localResources, + logger); + } catch (IOException e) { + logger.error("failed to start http server " + +"[port="+this.httpdPort + +", path="+httpServerPath+"]", e); + return false; + } + if(httpServer != null) { + if( logger.isDebugEnabled() ) { + logger.debug("started http daemon " + +"[access URL="+this.httpServerUrl+"]"); + } + // add counter reporting the access url to load balancer + ((this.localResources).getServiceCounterSet()) + .addCounter + (IServiceCounters.LOCAL_HTTPD, + new OneShotInstrument<String>(this.httpServerUrl)); + } + +//BTM - BEGIN ScaleOutIndexManager Note +//BTM - The call to embeddedIndexStore.didStart was previously +//BTM - commented out during the data service conversion. But +//BTM - the method didStart() on the original EmbeddedIndexStore and +//BTM - AbstractFederation calls the private method setupCounters; +//BTM - which seems to be important for at least the shard (data) +//BTM - service. The tests still passed without calling that method, +//BTM - but we should consider calling it at this point (the problem +//BTM - is that it waits on the resource manager to finish +//BTM - initializing, which waits on the transaction service to be +//BTM - discovered). Setting up these counters seem to be important +//BTM - only for the shard (data) service rather than the other +//BTM - services. So we should consider adding setupCounters to this +//BTM - class, and calling it here instead of calling +//BTM - embeddedIndexStore.didStart() or AbstractFederation.didStart(). +//BTM - +//BTM embeddedIndexStore.didStart(); + + setupCounters(); + +//BTM - END ScaleOutIndexManager Note + + return true; + } + private void setupLoggingContext() { try { @@ -1120,8 +1172,37 @@ } } -//BTM - see the note at the end of this class' constructor + class DeferredInitTask implements Runnable { + public DeferredInitTask() { } + + public void run() { + try { + if (!deferredInitDone) { +System.out.println("\n*** EmbededShardService#DeferredInitTask: DO DEFERRED INIT \n"); + deferredInitDone = deferredInit(); + } else { +System.out.println("\n*** EmbededShardService#DeferredInitTask: DEFERRED INIT DONE >>> CANCELLING TASK\n"); + deferredInitDone = true; + if (deferredInitTaskFuture != null) { + deferredInitTaskFuture.cancel(false); + deferredInitTaskFuture = null; + } + } + } catch (Throwable t) { +System.out.println("\n*** EmbededShardService#DeferredInitTask: EXCEPTION >>> "+t+"\n"); + logger.error("deferred initialization failure", t); + deferredInitDone = true; + if (deferredInitTaskFuture != null) { + deferredInitTaskFuture.cancel(false); + deferredInitTaskFuture = null; + } + } + } + } + +//BTM - see the note at the end of the deferredInit method + /** * Sets up shard service specific counters. * <p> Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties 2010-11-11 22:00:41 UTC (rev 3937) @@ -37,5 +37,4 @@ #log4j.logger.com.bigdata.shard=DEBUG #log4j.logger.com.bigdata.shard.EmbeddedShardService=DEBUG -#log4j.logger.com.bigdata.journal.EmbeddedIndexStore=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config 2010-11-11 22:00:41 UTC (rev 3937) @@ -11,6 +11,10 @@ import net.jini.core.discovery.LookupLocator; import net.jini.discovery.LookupDiscoveryManager; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + import com.bigdata.util.config.NicUtil; import com.bigdata.util.config.ConfigDeployUtil; @@ -89,3 +93,15 @@ (com.bigdata.shard.serverExporterTcpServerEndpoint, com.bigdata.shard.serverILFactory, false, false); } + +//NOTE: remove once dynamic discovery of zookeeper is added +org.apache.zookeeper.ZooKeeper { + + zroot = ConfigDeployUtil.getString("federation.name"); + + servers = com.bigdata.shard.serverExporterIpAddr+":2888:3888"; + + acl = new ACL[] { + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + }; +} Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.config =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.config 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.config 2010-11-11 22:00:41 UTC (rev 3937) @@ -137,7 +137,7 @@ /* Template for matching the services to which the clients will be * distributed for execution. Normally you will specify - * IClientService as the interface to be discovered. While it is + * CallableExecutor as the interface to be discovered. While it is * possible to run tasks on an IDataService or even an * IMetadataService since they both implement IRemoteExecutor, it * is generally discouraged unless the tasks require explicit @@ -148,7 +148,8 @@ new ServiceTemplate( null, //serviceID new Class[]{ - com.bigdata.service.IClientService.class +//BTM com.bigdata.service.IClientService.class + com.bigdata.service.CallableExecutor.class }, null // attributes ), Modified: branches/dev-btm/src/resources/config/bigdataCluster.config =================================================================== --- branches/dev-btm/src/resources/config/bigdataCluster.config 2010-11-11 15:02:03 UTC (rev 3936) +++ branches/dev-btm/src/resources/config/bigdataCluster.config 2010-11-11 22:00:41 UTC (rev 3937) @@ -630,14 +630,15 @@ "org.apache.zookeeper.server.quorum.QuorumPeerMain", //BTM "com.bigdata.service.jini.TransactionServer", //BTM "com.bigdata.service.jini.MetadataServer", - "com.bigdata.service.jini.DataServer", +//BTM "com.bigdata.service.jini.DataServer", //BTM "com.bigdata.service.jini.LoadBalancerServer", - "com.bigdata.service.jini.ClientServer", +//BTM "com.bigdata.service.jini.ClientServer", "com.bigdata.transaction.ServiceImpl", "com.bigdata.metadata.ServiceImpl", -//BTM "com.bigdata.shard.ServiceImpl", -"com.bigdata.loadbalancer.ServiceImpl" +"com.bigdata.shard.ServiceImpl", +"com.bigdata.loadbalancer.ServiceImpl", +"com.bigdata.executor.ServiceImpl" }; @@ -1340,6 +1341,36 @@ // log4j = "file:@NAS@/dist/bigdata/var/config/logging/loadbalancer-logging.properties"; } + +com.bigdata.executor.ServiceImpl { + + constraints = new IServiceConstraint[] { + new JiniRunningConstraint(), + new ZookeeperRunningConstraint(), + new HostAllowConstraint(bigdata.cs), + new MaxClientServicesPerHostConstraint(bigdata.maxClientServicePerHost) + }; + + args = new String[]{ + "-Xmx1600m", // was 800m + //"-XX:-UseGCOverheadLimit",//keeps VM alive even when memory starved + "-XX:+UseParallelOldGC", + //"-XX:ParallelGCThreads=8", + + "-Djava.util.logging.config.file=@NAS@/dist/bigdata/var/config/logging/executor-logging.properties", + "-Dlog4j.configuration=@NAS@/dist/bigdata/var/config/logging/executor-logging.properties", + "-Dlog4j.primary.configuration=@NAS@/dist/bigdata/var/config/logging/executor-logging.properties", + "-DusingServiceConfiguration=true", + "-Dbigdata.logDir=@NAS@/dist/bigdata/var/log", + "-DappHome=@APP_HOME@", + "-Dconfig=@NAS@/dist/bigdata/var/config/jini/executor.config" + }; + + serviceCount = bigdata.clientServiceCount; + + properties = new NV[] { + }; +} //BTM - END /** @@ -1959,7 +1990,7 @@ /* Template for matching the services to which the clients will be * distributed for execution. Normally you will specify - * IClientService as the interface to be discovered. While it is + * CallableExecutor as the interface to be discovered. While it is * possible to run tasks on a shard service or even a shard * locator service since they both implement ShardManagement, it * is generally discouraged unless the tasks require explicit @@ -1970,7 +2001,8 @@ new ServiceTemplate( null, //serviceID new Class[]{ - com.bigdata.service.IClientService.class +//BTM com.bigdata.service.IClientService.class + com.bigdata.service.CallableExecutor.class }, null // attributes ), @@ -2065,7 +2097,7 @@ /* Template for matching the services to which the clients will be * distributed for execution. Normally you will specify - * IClientService as the interface to be discovered. While it is + * CallableExecutor as the interface to be discovered. While it is * possible to run tasks on a shard service or even a shard * locator service since they both implement ShardManagement, it * is generally discouraged unless the tasks require explicit @@ -2076,7 +2108,8 @@ new ServiceTemplate( null, //serviceID new Class[]{ - com.bigdata.service.IClientService.class +//BTM com.bigdata.service.IClientService.class + com.bigdata.service.CallableExecutor.class }, null // attributes ), @@ -2261,7 +2294,7 @@ /* Template for matching the services to which the clients will be * distributed for execution. Normally you will specify - * IClientService as the interface to be discovered. While it is + * CallableExecutor as the interface to be discovered. While it is * possible to run tasks on a shard service or even a shard * locator service since they both implement ShardManagement, it * is generally discouraged unless the tasks require explicit @@ -2272,7 +2305,8 @@ new ServiceTemplate( null, //serviceID new Class[]{ - com.bigdata.service.IClientService.class +//BTM com.bigdata.service.IClientService.class + com.bigdata.service.CallableExecutor.class }, null), // attributes null // filter This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |