From: <btm...@us...> - 2010-10-29 12:22:25
|
Revision: 3850 http://bigdata.svn.sourceforge.net/bigdata/?rev=3850&view=rev Author: btmurphy Date: 2010-10-29 12:22:15 +0000 (Fri, 29 Oct 2010) Log Message: ----------- CHECKPOINT - phase 1 of callable executor (client) smart proxy work. Compiles cleanly, but still need to work on debugging unit tests as well as both deployment mechanisms (checkpointing for safety) Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegment.java branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegmentStore.java branches/dev-btm/bigdata/src/java/com/bigdata/counters/LoadBalancerReportingTask.java branches/dev-btm/bigdata/src/java/com/bigdata/counters/httpd/HttpReportingServer.java branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/ConcurrencyManager.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/IIndexManager.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/IResourceManager.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/Journal.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/LocalTransactionManager.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/TemporaryStore.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/TemporaryStoreFactory.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractRelation.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/IJoinNexus.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/IJoinNexusFactory.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/MutationTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/QueryTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinMasterTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/JoinTaskFactoryTask.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/JoinTaskSink.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/AsynchronousOverflowTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/CompactingMergeTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/IncrementalBuildTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/JoinIndexPartitionTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/MoveTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/OverflowManager.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/ResourceManager.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/ScatterSplitTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/SplitIndexPartitionTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/SplitTailTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/ViewMetadata.java branches/dev-btm/bigdata/src/java/com/bigdata/search/FullTextIndex.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/CacheOnceMetadataIndex.java branches/dev-btm/bigdata/src/java/com/bigdata/service/CachingMetadataIndex.java branches/dev-btm/bigdata/src/java/com/bigdata/service/CallableExecutor.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/DataService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/DataTaskWrapper.java branches/dev-btm/bigdata/src/java/com/bigdata/service/EmbeddedFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IBigdataFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IDataServiceCallable.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IFederationCallable.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IndexCache.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ListIndicesTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/MetadataIndexCache.java branches/dev-btm/bigdata/src/java/com/bigdata/service/NoCacheMetadataIndexView.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/AbstractDataServiceProcedureTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/AbstractScaleOutClientIndexView.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/AbstractScaleOutClientIndexView2.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/ClientIndexViewRefactor.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/IScaleOutClientIndex.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/KeyArrayDataServiceProcedureTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/KeyRangeDataServiceProcedureTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/PartitionedTupleIterator.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/ScaleOutIndexCounters.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/SimpleDataServiceProcedureTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractPendingSetMasterStats.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractPendingSetMasterTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractRunnableMasterStats.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/pipeline/IndexAsyncWriteStats.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/pipeline/IndexWriteTask.java branches/dev-btm/bigdata/src/java/com/bigdata/util/config/ConfigurationUtil.java branches/dev-btm/bigdata/src/java/com/bigdata/util/config/Log4jLoggingHandler.java branches/dev-btm/bigdata/src/java/com/bigdata/util/config/LogUtil.java branches/dev-btm/bigdata/src/java/com/bigdata/util/config/NicUtil.java branches/dev-btm/bigdata/src/test/com/bigdata/bfs/AbstractRepositoryTestCase.java branches/dev-btm/bigdata/src/test/com/bigdata/relation/locator/TestDefaultResourceLocator.java branches/dev-btm/bigdata/src/test/com/bigdata/relation/rule/eval/TestDefaultEvaluationPlan.java branches/dev-btm/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java branches/dev-btm/bigdata/src/test/com/bigdata/resources/TestAddDeleteResource.java branches/dev-btm/bigdata/src/test/com/bigdata/resources/TestBuildTask2.java branches/dev-btm/bigdata/src/test/com/bigdata/resources/TestResourceManagerBootstrap.java branches/dev-btm/bigdata/src/test/com/bigdata/search/TestPrefixSearch.java branches/dev-btm/bigdata/src/test/com/bigdata/search/TestSearch.java branches/dev-btm/bigdata/src/test/com/bigdata/search/TestSearchRestartSafe.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEmbeddedClient.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEventParser.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEventReceiver.java branches/dev-btm/bigdata/src/test/com/bigdata/striterator/TestDistinctFilter.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/AdminProxy.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/PrivateInterface.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceProxy.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/BigdataServiceConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ClientServerConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/DataServerConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/MaxClientServicesPerHostConstraint.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ServicesManagerConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/ClientServer.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/JiniFederation.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/LoadBalancerServer.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/benchmark/ThroughputMaster.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/BigdataCachingServiceClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ClientServicesClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/DataServicesClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/LoadBalancerClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ServicesManagerClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ShardLocatorClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/TransactionServiceClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/AbstractClientTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/AggregatorTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/DiscoverServices.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/MappedTaskMaster.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferStatistics.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferSubtask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/ServiceMap.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/TaskMaster.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/DumpFederation.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/EmbeddedShardService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/EmbeddedTransactionService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/deploy.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/zookeeper/ZooResourceLockService.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/PerformanceTest.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/inf/BackchainOwlSameAsIterator.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/inf/OwlSameAsPropertiesExpandingIterator.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/BigdataRDFFullTextIndex.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/MagicRelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/TempMagicStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/InferenceEngine.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexusFactory.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPORelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/ScaleOutTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/TempTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/internal/constraints/TestInlineConstraints.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/magic/TestIRIS.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/magic/TestMagicStore.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestRDFXMLInterchangeWithStatementIdentifiers.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/AbstractRuleTestCase.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestDatabaseAtOnceClosure.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestJustifications.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestOptionals.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestRuleExpansion.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestSlice.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestTruthMaintenance.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPORelation.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOStarJoin.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/AbstractDistributedTripleStoreTestCase.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/AbstractEmbeddedTripleStoreTestCase.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestScaleOutTripleStoreWithEmbeddedFederation.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestScaleOutTripleStoreWithJiniFederation.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTempTripleStore.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl2.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailHelper.java branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java Added Paths: ----------- branches/dev-btm/bigdata/src/java/com/bigdata/discovery/ branches/dev-btm/bigdata/src/java/com/bigdata/discovery/IBigdataDiscoveryManagement.java branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSender.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/IScaleOutIndexManager.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/IScaleOutIndexStore.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/ILocalResourceManagement.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ForceOverflowTask.java branches/dev-btm/bigdata/src/java/com/bigdata/service/PurgeResourcesTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/EmbeddedCallableExecutor.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/BigdataDiscoveryManager.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/IJiniDiscoveryManagement.java Removed Paths: ------------- branches/dev-btm/bigdata/src/java/com/bigdata/journal/EmbeddedIndexStore.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManagement.java branches/dev-btm/bigdata/src/java/com/bigdata/service/DataServiceCallable.java branches/dev-btm/bigdata/src/java/com/bigdata/service/FederationCallable.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/bfs/BigdataFileSystem.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -56,6 +56,10 @@ import cutthecrap.utils.striterators.Resolver; import cutthecrap.utils.striterators.Striterator; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; + /** * A distributed file system with extensible metadata and atomic append * implemented using the bigdata scale-out architecture. Files have a client @@ -346,10 +350,21 @@ * * @see Options */ - public BigdataFileSystem(IIndexManager indexManager, String namespace, - Long timestamp, Properties properties) { - - super(indexManager,namespace,timestamp,properties); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE public BigdataFileSystem(IIndexManager indexManager, String namespace, +//BTM - PRE_CLIENT_SERVICE Long timestamp, Properties properties) { +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE super(indexManager,namespace,timestamp,properties); + public BigdataFileSystem(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + String namespace, + Long timestamp, + Properties properties) + { + super(indexManager, concurrencyManager, discoveryManager, + namespace, timestamp, properties); +//BTM - PRE_CLIENT_SERVICE - END /* * @todo This should probably be raised directly to a property reported Modified: branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -35,6 +35,10 @@ import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; + /** * Helper class. * @@ -63,7 +67,13 @@ /** * The {@link ITx#UNISOLATED} view. */ - synchronized public BigdataFileSystem getGlobalFileSystem() { +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE synchronized public BigdataFileSystem getGlobalFileSystem() { + synchronized public BigdataFileSystem getGlobalFileSystem + (final IConcurrencyManager concurrencyManager, + final IBigdataDiscoveryManagement discoveryManager) + { +//BTM - PRE_CLIENT_SERVICE - END if (INFO) log.info(""); @@ -71,9 +81,18 @@ if (globalRowStore == null) { // setup the repository view. - globalRowStore = new BigdataFileSystem(indexManager, - GLOBAL_FILE_SYSTEM_NAMESPACE, ITx.UNISOLATED, - new Properties()); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE globalRowStore = new BigdataFileSystem(indexManager, +//BTM - PRE_CLIENT_SERVICE GLOBAL_FILE_SYSTEM_NAMESPACE, ITx.UNISOLATED, +//BTM - PRE_CLIENT_SERVICE new Properties()); + globalRowStore = + new BigdataFileSystem(indexManager, + concurrencyManager, + discoveryManager, + GLOBAL_FILE_SYSTEM_NAMESPACE, + ITx.UNISOLATED, + new Properties()); +//BTM - PRE_CLIENT_SERVICE - END // register the indices. globalRowStore.create(); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegment.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegment.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegment.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -308,18 +308,33 @@ fileStore.lock.lock(); try { - if (fileStore.fed != null) { - -//BTM openCloseEvent = new Event(fileStore.fed, new EventResource( -//BTM fileStore.getIndexMetadata(), fileStore.file), -//BTM EventType.IndexSegmentOpenClose); -openCloseEvent = new Event( (fileStore.fed).getEventQueue(), - (fileStore.fed).getServiceIface(), - (fileStore.fed).getServiceName(), - (fileStore.fed).getServiceUUID(), - new EventResource(fileStore.getIndexMetadata(), fileStore.file), - EventType.IndexSegmentOpenClose); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE if (fileStore.fed != null) { +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE //BTM openCloseEvent = new Event(fileStore.fed, new EventResource( +//BTM - PRE_CLIENT_SERVICE //BTM fileStore.getIndexMetadata(), fileStore.file), +//BTM - PRE_CLIENT_SERVICE //BTM EventType.IndexSegmentOpenClose); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE openCloseEvent = new Event( (fileStore.fed).getEventQueue(), +//BTM - PRE_CLIENT_SERVICE (fileStore.fed).getServiceIface(), +//BTM - PRE_CLIENT_SERVICE (fileStore.fed).getServiceName(), +//BTM - PRE_CLIENT_SERVICE (fileStore.fed).getServiceUUID(), +//BTM - PRE_CLIENT_SERVICE new EventResource(fileStore.getIndexMetadata(), fileStore.file), +//BTM - PRE_CLIENT_SERVICE EventType.IndexSegmentOpenClose); +//BTM - PRE_CLIENT_SERVICE } +//BTM - PRE_CLIENT_SERVICE + if (fileStore.localResourceManager != null) { + openCloseEvent = + new Event + ( (fileStore.localResourceManager).getEventQueueSender(), + (fileStore.localResourceManager).getServiceIface(), + (fileStore.localResourceManager).getServiceName(), + (fileStore.localResourceManager).getServiceUUID(), + new EventResource(fileStore.getIndexMetadata(), + fileStore.file), + EventType.IndexSegmentOpenClose); } +//BTM - PRE_CLIENT_SERVICE - END if (!fileStore.isOpen()) { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegmentStore.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegmentStore.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/btree/IndexSegmentStore.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -59,9 +59,12 @@ import com.bigdata.service.Event; import com.bigdata.service.EventResource; import com.bigdata.service.EventType; -import com.bigdata.service.IBigdataFederation; +//BTM - PRE_CLIENT_SERVICE import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ResourceService; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.resources.ILocalResourceManagement; + /** * A read-only store backed by a file containing a single {@link IndexSegment}. * @@ -274,7 +277,10 @@ /** * Optional. When defined, {@link Event}s are reported out. */ - protected final IBigdataFederation<?> fed; +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE protected final IBigdataFederation<?> fed; + protected final ILocalResourceManagement localResourceManager; +//BTM - PRE_CLIENT_SERVICE - END private volatile Event openCloseEvent; /** @@ -299,7 +305,11 @@ */ public IndexSegmentStore(final File file) { - this(file, null/* fed */); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE this(file, null/* fed */); + this(file, + null);//ILocalResourceManagement +//BTM - PRE_CLIENT_SERVICE - END } @@ -310,15 +320,27 @@ * @param file * @param fed */ - public IndexSegmentStore(final File file, final IBigdataFederation<?> fed) { - - if (file == null) - throw new IllegalArgumentException(); - +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE public IndexSegmentStore(final File file, final IBigdataFederation<?> fed) { +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE if (file == null) +//BTM - PRE_CLIENT_SERVICE throw new IllegalArgumentException(); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE this.file = file; +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE // MAY be null. +//BTM - PRE_CLIENT_SERVICE this.fed = fed; +//BTM - PRE_CLIENT_SERVICE + public IndexSegmentStore + (final File file, + final ILocalResourceManagement localResourceManager) + { + if (file == null) { + throw new NullPointerException("null file"); + } this.file = file; - - // MAY be null. - this.fed = fed; + this.localResourceManager = localResourceManager;//can be null +//BTM - PRE_CLIENT_SERVICE - END /* * Mark as open so that we can use reopenChannel() and read(long addr) @@ -445,16 +467,32 @@ counters.openCount++; - if (fed != null) { - -//BTM openCloseEvent = new Event(fed, new EventResource( -//BTM indexMetadata, file), -//BTM EventType.IndexSegmentStoreOpenClose).start(); -openCloseEvent = new Event( fed.getEventQueue(), fed.getServiceIface(), fed.getServiceName(), fed.getServiceUUID(), - new EventResource(indexMetadata, file), - EventType.IndexSegmentStoreOpenClose).start(); - +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE if (fed != null) { +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE //BTM openCloseEvent = new Event(fed, new EventResource( +//BTM - PRE_CLIENT_SERVICE //BTM indexMetadata, file), +//BTM - PRE_CLIENT_SERVICE //BTM EventType.IndexSegmentStoreOpenClose).start(); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE openCloseEvent = new Event( fed.getEventQueue(), +//BTM - PRE_CLIENT_SERVICE fed.getServiceIface(), +//BTM - PRE_CLIENT_SERVICE fed.getServiceName(), +//BTM - PRE_CLIENT_SERVICE fed.getServiceUUID(), +//BTM - PRE_CLIENT_SERVICE new EventResource(indexMetadata, file), +//BTM - PRE_CLIENT_SERVICE EventType.IndexSegmentStoreOpenClose).start(); +//BTM - PRE_CLIENT_SERVICE } +//BTM - PRE_CLIENT_SERVICE + if (localResourceManager != null) { + openCloseEvent = + new Event + ( localResourceManager.getEventQueueSender(), + localResourceManager.getServiceIface(), + localResourceManager.getServiceName(), + localResourceManager.getServiceUUID(), + new EventResource(indexMetadata, file), + EventType.IndexSegmentStoreOpenClose).start(); } +//BTM - PRE_CLIENT_SERVICE - END } catch (Throwable t) { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/counters/LoadBalancerReportingTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/counters/LoadBalancerReportingTask.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/counters/LoadBalancerReportingTask.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -25,14 +25,14 @@ package com.bigdata.counters; import com.bigdata.counters.CounterSet; +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.rawstore.Bytes; -import com.bigdata.service.IFederationDelegate; +import com.bigdata.resources.ILocalResourceManagement; +import com.bigdata.resources.ResourceManager; import com.bigdata.service.LoadBalancer; import com.bigdata.util.config.LogUtil; -import net.jini.core.lookup.ServiceItem; -import net.jini.lookup.LookupCache; - import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -47,34 +47,32 @@ */ public class LoadBalancerReportingTask implements Runnable { - private IFederationDelegate embeddedIndexStore; - private UUID serviceUUID; - private CounterSet serviceRoot; - private LookupCache lbsCache;//for discovering lbs - private LoadBalancer embeddedLbs;//for testing embedded fed + private ResourceManager resourceMgr; + private IConcurrencyManager concurrencyMgr; + private ILocalResourceManagement localResourceMgr; + private IBigdataDiscoveryManagement discoveryMgr; private Logger logger; public LoadBalancerReportingTask - (IFederationDelegate embeddedIndexStore, - UUID serviceUUID, - CounterSet serviceRoot, - LookupCache loadBalancerCache, - LoadBalancer embeddedLoadBalancer, + (ResourceManager resourceMgr, + IConcurrencyManager concurrencyMgr, + ILocalResourceManagement localResourceMgr, + IBigdataDiscoveryManagement discoveryMgr, Logger logger) { - this.embeddedIndexStore = embeddedIndexStore; - this.serviceUUID = serviceUUID; - this.serviceRoot = serviceRoot; - this.lbsCache = loadBalancerCache; - this.embeddedLbs = embeddedLoadBalancer;//for embedded fed testing - this.logger = (logger == null ? - LogUtil.getLog4jLogger((this.getClass()).getName()) : - logger); + this.resourceMgr = resourceMgr; + this.concurrencyMgr = concurrencyMgr; + this.localResourceMgr = localResourceMgr; + this.discoveryMgr = discoveryMgr; + this.logger = (logger == null ? + LogUtil.getLog4jLogger((this.getClass()).getName()) : + logger); } public void run() { try { - embeddedIndexStore.reattachDynamicCounters(); + localResourceMgr.reattachDynamicCounters + (resourceMgr, concurrencyMgr); } catch (Throwable t) { logger.error ("failure on dynamic counter reattachment ["+t+"]", t); @@ -89,32 +87,22 @@ } private void reportPerformanceCounters() throws IOException { -System.out.println("\n>>>>> LoadBalancerReportingTask.reportPerformanceCounters: serviceUUID = "+serviceUUID); - LoadBalancer lbs = null; - if(embeddedLbs != null) { - lbs = embeddedLbs; - } else { - if(lbsCache != null) { - ServiceItem lbsItem = lbsCache.lookup(null); - if(lbsItem != null) { - lbs = (LoadBalancer)(lbsItem.service); - } - } - } +//BTM +System.out.println("\n>>>>> LoadBalancerReportingTask.reportPerformanceCounters: serviceUUID = "+localResourceMgr.getServiceUUID()); + LoadBalancer lbs = discoveryMgr.getLoadBalancerService(); if(lbs == null) { logger.warn ("cannot report counters [no load balancer service]"); System.out.println(">>>>> LoadBalancerReportingTask.reportPerformanceCounters: loadBalancerService = NULL"); return; } - System.out.println(">>>>> LoadBalancerReportingTask.reportPerformanceCounters: loadBalancerService = "+lbs); - ByteArrayOutputStream baos = new ByteArrayOutputStream(Bytes.kilobyte32 * 2); - serviceRoot.asXML(baos, "UTF-8", null/* filter */); - + (localResourceMgr.getServiceCounterSet()).asXML(baos, + "UTF-8", + null);//filter System.out.println(">>>>> LoadBalancerReportingTask.reportPerformanceCounters: CALLING loadBalancer.notify ..."); - lbs.notify(serviceUUID, baos.toByteArray()); + lbs.notify(localResourceMgr.getServiceUUID(), baos.toByteArray()); System.out.println(">>>>> LoadBalancerReportingTask.reportPerformanceCounters: DONE CALLING loadBalancer.notify"); if (logger.isDebugEnabled()) { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/counters/httpd/HttpReportingServer.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/counters/httpd/HttpReportingServer.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/counters/httpd/HttpReportingServer.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -25,7 +25,9 @@ package com.bigdata.counters.httpd; import com.bigdata.counters.CounterSet; -import com.bigdata.service.IFederationDelegate; +import com.bigdata.journal.IConcurrencyManager; +import com.bigdata.resources.ILocalResourceManagement; +import com.bigdata.resources.ResourceManager; import com.bigdata.util.config.LogUtil; import org.apache.log4j.Logger; @@ -41,18 +43,23 @@ */ public class HttpReportingServer extends CounterSetHTTPD { - private IFederationDelegate embeddedIndexStore; + private ResourceManager resourceMgr; + private IConcurrencyManager concurrencyMgr; + private ILocalResourceManagement localResourceMgr; private Logger logger; public HttpReportingServer (final int port, - final CounterSet root, - final IFederationDelegate embeddedIndexStore, + final ResourceManager resourceMgr, + final IConcurrencyManager concurrencyMgr, + final ILocalResourceManagement localResourceMgr, Logger logger) throws IOException { - super(port, root); - this.embeddedIndexStore = embeddedIndexStore; + super(port, localResourceMgr.getServiceCounterSet()); + this.resourceMgr = resourceMgr; + this.concurrencyMgr = concurrencyMgr; + this.localResourceMgr = localResourceMgr; this.logger = (logger == null ? LogUtil.getLog4jLogger((this.getClass()).getName()) : logger); @@ -67,7 +74,8 @@ throws Exception { try { - embeddedIndexStore.reattachDynamicCounters(); + localResourceMgr.reattachDynamicCounters + (resourceMgr, concurrencyMgr); } catch (Exception e) { // Usually because the live journal has been // concurrently closed during the request. Added: branches/dev-btm/bigdata/src/java/com/bigdata/discovery/IBigdataDiscoveryManagement.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/discovery/IBigdataDiscoveryManagement.java (rev 0) +++ branches/dev-btm/bigdata/src/java/com/bigdata/discovery/IBigdataDiscoveryManagement.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -0,0 +1,157 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.discovery; + +import com.bigdata.journal.TransactionService; +import com.bigdata.service.LoadBalancer; +import com.bigdata.service.ShardLocator; +import com.bigdata.service.ShardService; + +import java.util.UUID; + +/** + * Defines the interface for implementations that manage the discovery + * of the services in a Bigdata federation. + */ +public interface IBigdataDiscoveryManagement { + + /** + * Returns a reference to a transaction service; or <code>null</code> + * if such a service has not been discovered. + * + * @return reference to a transaction service; or <code>null</code> + * if such a service has not been discovered. + */ + TransactionService getTransactionService(); + + /** + * Returns a reference to a load balancer service; or <code>null</code> + * if such a service has not been discovered. + * + * @return reference to a load balancer service; or <code>null</code> + * if such a service has not been discovered. + */ + LoadBalancer getLoadBalancerService(); + + /** + * Returns a reference to a shard locator (metadata) service; or + * <code>null</code> if such a service has not been discovered. + * + * @return reference to a shard locator (metadata) service; or + * <code>null</code> if such a service has not been discovered. + */ + ShardLocator getMetadataService(); + + /** + * Returns an array whose elements are the UUIDs corresponding to + * a set of discovered shard (data) service(s). + * + * @param maxCount The maximum number of elements to return; where + * the number returned may be less than the value + * specified for <code>maxCount</code>, but will + * not be greater. Note that when zero (0) is + * input for this parameter, the UUIDs of all + * discovered shard (data) service(s) will be + * returned. + * + * @return An array of {@link UUID}s for data services. + * + * @throws IllegalArgumentException when a negative is input for + * the <code>maxCount</code> parameter. + */ + UUID[] getDataServiceUUIDs(int maxCount); + + /** + * Returns an array whose elements are references to shard (data) + * service(s) having UUID corresponding to an element of the + * <code>uuids</code> parameter. If no shard services exist (or can + * be discovered) that satisfy the given criteria, then an + * empty array is returned. + * + * @param uuids array whose elements are the UUIDs of the shard (data) + * service(s) to discover and return. + * + * @return array whose elements are references to shard (data) + * service(s) having UUID corresponding to an element of + * the <code>uuid</code> parameter; or <code>null</code> + * if no shard services exist (or can be discovered) that + * satisfy the given criteria. + * + * @throws NullPointerException if <code>null</code> is input for the + * <code>uuids</code> parameter. + */ + ShardService[] getDataServices(UUID[] uuids); + + /** + * Returns a reference to the shard (data) service whose corresponding + * UUID equals the <code>uuid</code> parameter. If no shard service + * exists (or can be discovered) that satisfies the given criteria, + * then <code>null</code> is returned. + * + * @param uuid the UUID of the shard (data) service to discover and + * return. + * + * @return reference to the shard (data) service whose corresponding + * UUID equals the <code>uuid</code> parameter; or + * <code>null</code> if no shard services exist (or can be + * discovered) that satisfy the given criteria. + * + * @throws NullPointerException if <code>null</code> is input for the + * <code>uuid</code> parameter. + */ + ShardService getDataService(UUID uuid); + + /** + * Returns a reference to the shard (data) service; where the criteria + * used to choose the service whose reference is returned is + * implementation-dependent. If no shard service exists (or can be + * discovered), then <code>null</code> is returned. + * + * @return reference to the shard (data) service; where the criteria + * used to choose the service whose reference is returned is + * implementation-dependent; or <code>null</code> if no shard + * services exist (or can be discovered). + */ + ShardService getAnyDataService(); + + /** + * Returns a reference to the shard (data) service whose corresponding + * name attribute the value input for the <code>name</code> parameter. + * If no shard service exists (or can be discovered) that satisfies + * the given criteria, then <code>null</code> is returned. + * + * @param name the value of the name attribute for the shard (data) + * service to discover and return. + * + * @return reference to the shard (data) service whose corresponding + * name attribute the value input for the <code>name</code> + * parameter; or <code>null</code> if no shard services exist + * (or can be discovered) that satisfy the given criteria. + * + * @throws NullPointerException if <code>null</code> is input for the + * <code>name</code> parameter. + */ + ShardService getDataServiceByName(String name); +} Property changes on: branches/dev-btm/bigdata/src/java/com/bigdata/discovery/IBigdataDiscoveryManagement.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSender.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSender.java (rev 0) +++ branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSender.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -0,0 +1,35 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ + +package com.bigdata.event; + +/** + * Convenience interface that allows one to specify objects + * that can be used to both queue events for sending, and + * execute as tasks -- asynchronously -- that send the + * currently queued events to the desired event receiver. + */ +public interface EventQueueSender extends EventQueue, Runnable { +} Property changes on: branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSender.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -25,37 +25,31 @@ package com.bigdata.event; +import com.bigdata.discovery.IBigdataDiscoveryManagement; import com.bigdata.service.Event; import com.bigdata.service.EventReceivingService; -import com.bigdata.service.LoadBalancer; import com.bigdata.util.config.LogUtil; -import net.jini.core.lookup.ServiceItem; -import net.jini.lookup.LookupCache; - import org.apache.log4j.Level; import org.apache.log4j.Logger; import java.util.LinkedList; import java.util.concurrent.BlockingQueue; -public class EventQueueSenderTask implements EventQueue, Runnable { +public class EventQueueSenderTask implements EventQueueSender { private BlockingQueue<Event> eventQueue; - private LookupCache lbsCache;//for discovering lbs - private LoadBalancer embeddedLbs;//for testing embedded fed + private IBigdataDiscoveryManagement discoveryMgr; private String serviceName; private Logger logger; public EventQueueSenderTask(BlockingQueue<Event> eventQueue, - LookupCache loadBalancerCache, - LoadBalancer embeddedLoadBalancer, + IBigdataDiscoveryManagement discoveryMgr, String serviceName, Logger logger) { this.eventQueue = eventQueue; - this.lbsCache = loadBalancerCache; - this.embeddedLbs = embeddedLoadBalancer;//for embedded fed testing + this.discoveryMgr = discoveryMgr; this.serviceName = serviceName;//for debug output this.logger = (logger == null ? @@ -63,6 +57,9 @@ logger); } + // Note: EventQueueSender interface sub-classes EventQueue interface + // and Runnable interface + // Required by EventQueue interface public void queueEvent(Event e) { @@ -75,22 +72,9 @@ //BTM - for now, maintain the same logic and functionality as that in //BTM the class AbstractFederation#SendEventsTask - try { - LoadBalancer lbs = null; - EventReceivingService serviceRef = null; - if(embeddedLbs != null) { - lbs = embeddedLbs; - } else { - if(lbsCache != null) { - ServiceItem lbsItem = lbsCache.lookup(null); - if(lbsItem != null) { - lbs = (LoadBalancer)(lbsItem.service); - } - } - } - if(lbs == null) return; - serviceRef = (EventReceivingService)lbs; + EventReceivingService serviceRef = + (EventReceivingService)(discoveryMgr.getLoadBalancerService()); final long begin = System.currentTimeMillis();//for logging Modified: branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -2323,8 +2323,13 @@ && isResource(namespace + "."+BigdataFileSystem.FILE_DATA_INDEX_BASENAME)) { // unisolated view - will create if it does not exist. - return new GlobalFileSystemHelper(this).getGlobalFileSystem(); - +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(this).getGlobalFileSystem(); + return new GlobalFileSystemHelper(this) + .getGlobalFileSystem + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } // read committed view IFF it exists otherwise [null] @@ -2353,8 +2358,12 @@ */ public TemporaryStore getTempStore() { - return tempStoreFactory.getTempStore(); - +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return tempStoreFactory.getTempStore(); + return tempStoreFactory.getTempStore + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } private TemporaryStoreFactory tempStoreFactory = new TemporaryStoreFactory(); @@ -2727,8 +2736,12 @@ */ public TemporaryStore getTempStore() { - return tempStoreFactory.getTempStore(); - +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return tempStoreFactory.getTempStore(); + return tempStoreFactory.getTempStore + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } private TemporaryStoreFactory tempStoreFactory = new TemporaryStoreFactory(); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/journal/ConcurrencyManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/ConcurrencyManager.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/ConcurrencyManager.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -38,6 +38,9 @@ import com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask; import com.bigdata.util.concurrent.WriteTaskCounters; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.journal.IScaleOutIndexStore; + /** * Supports concurrent operations against named indices. Historical read and * read-committed tasks run with full concurrency. For unisolated tasks, the @@ -1228,7 +1231,8 @@ // And even then only for the distributed federation try { - if (!(resourceManager.getFederation() instanceof AbstractDistributedFederation)) { +//BTM - PRE_CLIENT_SERVICE if (!(resourceManager.getFederation() instanceof AbstractDistributedFederation)) { + if (!(resourceManager.getIndexManager() instanceof IScaleOutIndexStore)) { return 0; } } catch (UnsupportedOperationException ex) { Deleted: branches/dev-btm/bigdata/src/java/com/bigdata/journal/EmbeddedIndexStore.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/EmbeddedIndexStore.java 2010-10-28 18:39:50 UTC (rev 3849) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/EmbeddedIndexStore.java 2010-10-29 12:22:15 UTC (rev 3850) @@ -1,799 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.journal; - -import com.bigdata.bfs.BigdataFileSystem; -import com.bigdata.btree.IndexMetadata; -import com.bigdata.counters.CounterSet; -import com.bigdata.counters.ICounterSet; -import com.bigdata.counters.IDataServiceCounters; -import com.bigdata.counters.Instrument; -import com.bigdata.counters.IProcessCounters; -import com.bigdata.counters.IStatisticsCollector; -import com.bigdata.counters.ReadBlockCounters; -import com.bigdata.event.EventQueue; -import com.bigdata.io.DirectBufferPool; -import com.bigdata.jini.util.JiniUtil; -import com.bigdata.journal.ConcurrencyManager; -import com.bigdata.journal.ConcurrencyManager.IConcurrencyManagerCounters; -import com.bigdata.journal.IResourceLockService; -import com.bigdata.journal.LocalTransactionManager; -import com.bigdata.journal.TemporaryStore; -import com.bigdata.journal.TemporaryStoreFactory; -import com.bigdata.journal.TransactionService; -import com.bigdata.mdi.IMetadataIndex; -import com.bigdata.relation.locator.DefaultResourceLocator; -import com.bigdata.relation.locator.IResourceLocator; -import com.bigdata.resources.IndexManager.IIndexManagerCounters; -import com.bigdata.resources.LocalResourceManagement; -import com.bigdata.resources.ResourceManager; -import com.bigdata.resources.ResourceManager.IResourceManagerCounters; -import com.bigdata.service.IBigdataClient; -import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.IService; -import com.bigdata.service.IServiceShutdown; -import com.bigdata.service.LoadBalancer; -import com.bigdata.service.MetadataIndexCache; -import com.bigdata.service.MetadataIndexCachePolicy; -import com.bigdata.service.ShardLocator; -import com.bigdata.service.Service; -import com.bigdata.service.ShardService; -import com.bigdata.service.ndx.IClientIndex; -import com.bigdata.sparse.SparseRowStore; -import com.bigdata.util.Util; -import com.bigdata.util.config.LogUtil; -import com.bigdata.util.httpd.AbstractHTTPD; - -import net.jini.core.lookup.ServiceID; -import net.jini.core.lookup.ServiceItem; -import net.jini.lookup.LookupCache; - -import org.apache.log4j.Logger; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.Writer; -import java.util.HashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -//NOTE: replace IBigdataFederation with IIndexStore when -// StoreManager.getResourceLocator is changed to no longer -// call getFederation -// -// IBigdataFederation extends IIndexManager and IFederationDelegate -// IIndexManager extends IIndexStore -public class EmbeddedIndexStore<T> implements IBigdataFederation<T> { - - public static Logger logger = - LogUtil.getLog4jLogger((EmbeddedIndexStore.class).getName()); - - private UUID serviceUUID; - private Class serviceType; - private String serviceName; - private String hostname; - private LocalResourceManagement embeddedBackend; - private CounterSet countersRoot; - private IStatisticsCollector statisticsCollector; - private Properties properties; - private ReadBlockCounters readBlockCounters; - private LocalTransactionManager localTxnMgr; - private EventQueue eventQueue; - private ExecutorService embeddedThreadPool; - private String httpServerUrl; - private DefaultResourceLocator resourceLocator; - - private IServiceShutdown service; - private ResourceManager resourceMgr; - private ConcurrencyManager concurrencyMgr; - - private long lastReattachMillis = 0L; - private TemporaryStoreFactory tempStoreFactory; - - private MetadataIndexCache metadataIndexCache; - -private LookupCache lbsServiceCache; -private LookupCache mdsServiceCache; -private LookupCache shardCache; -private LookupCache remoteShardCache; - -private LoadBalancer embeddedLbs; -private ShardLocator embeddedMds; -private Map<UUID, ShardService> embeddedDsMap; - - public EmbeddedIndexStore - (UUID serviceUUID, - Class serviceType, - String serviceName, - String hostname, -LookupCache lbsServiceCache, -LookupCache mdsServiceCache, -LookupCache shardCache, -LookupCache remoteShardCache, -LoadBalancer embeddedLbs, - LocalResourceManagement embeddedBackend, - TemporaryStoreFactory tempStoreFactory, - int indexCacheSize, - long indexCacheTimeout, - MetadataIndexCachePolicy metadataIndexCachePolicy, - int resourceLocatorCacheSize, - long resourceLocatorCacheTimeout, - CounterSet countersRoot, - IStatisticsCollector statisticsCollector, - Properties properties, - ReadBlockCounters readBlockCounters, - LocalTransactionManager localTxnMgr, - EventQueue eventQueue, - ExecutorService embeddedThreadPool, - String httpServerUrl) - { - this.serviceUUID = serviceUUID; - this.serviceType = serviceType; - this.serviceName = serviceName; - this.hostname = hostname; -this.lbsServiceCache = lbsServiceCache; -this.mdsServiceCache = mdsServiceCache; -this.shardCache = shardCache; -this.remoteShardCache = remoteShardCac... [truncated message content] |
From: <btm...@us...> - 2010-11-08 23:56:10
|
Revision: 3919 http://bigdata.svn.sourceforge.net/bigdata/?rev=3919&view=rev Author: btmurphy Date: 2010-11-08 23:56:01 +0000 (Mon, 08 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT FOR SAFETY - phase 1 of callable executor (client service) smart proxy work. Includes fixes to issues identified by the tests; in particular the com.bigdata.service and com.bigdata.jini tests Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java branches/dev-btm/bigdata/src/java/com/bigdata/service/IndexCache.java branches/dev-btm/bigdata/src/java/com/bigdata/service/MetadataService.java branches/dev-btm/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java branches/dev-btm/bigdata/src/test/com/bigdata/relation/locator/TestDefaultResourceLocator.java branches/dev-btm/bigdata/src/test/com/bigdata/service/AbstractEmbeddedFederationTestCase.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEDS.java branches/dev-btm/bigdata/src/test/com/bigdata/service/TestEmbeddedClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/EmbeddedCallableExecutor.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/config/ZookeeperServerConfiguration.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/start/process/ZookeeperProcessHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/EmbeddedShardLocator.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/AbstractServer.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/DataServer.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/JiniClient.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ServiceCache.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/AbstractClientTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/master/TaskMaster.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/zookeeper/ZooHelper.java branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestAll.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/TestBigdataClient.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestAll.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/IRISUtils.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/MagicAccessPath.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/magic/TempMagicStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/AsynchronousStatementBufferFactory.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/AbstractRuleFastClosure_3_5_6_7_9.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPORelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/StressTestCentos.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreTransactionSemantics.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/store/TestRelationLocator.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailHelper.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/bfs/GlobalFileSystemHelper.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -107,14 +107,30 @@ /** * {@link ITx#READ_COMMITTED} view. */ - public BigdataFileSystem getReadCommitted() { - - if (INFO) +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE public BigdataFileSystem getReadCommitted() { +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE if (INFO) +//BTM - PRE_CLIENT_SERVICE log.info(""); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE return (BigdataFileSystem) indexManager.getResourceLocator().locate( +//BTM - PRE_CLIENT_SERVICE GLOBAL_FILE_SYSTEM_NAMESPACE, ITx.READ_COMMITTED); +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE } + public BigdataFileSystem getReadCommitted + (IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager) + { + if (INFO) { log.info(""); - - return (BigdataFileSystem) indexManager.getResourceLocator().locate( - GLOBAL_FILE_SYSTEM_NAMESPACE, ITx.READ_COMMITTED); - + } + return (BigdataFileSystem) indexManager.getResourceLocator() + .locate( indexManager, + concurrencyManager, + discoveryManager, + GLOBAL_FILE_SYSTEM_NAMESPACE, + ITx.READ_COMMITTED ); } +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/event/EventQueueSenderTask.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -75,7 +75,12 @@ try { EventReceivingService serviceRef = (EventReceivingService)(discoveryMgr.getLoadBalancerService()); - + if (serviceRef == null) { + logger.log(Level.WARN, "cannot send events to load " + +"balancer from "+serviceName + +" - load balancer unavailable"); + return; + } final long begin = System.currentTimeMillis();//for logging final LinkedList<Event> queuedEvents = new LinkedList<Event>(); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -2325,15 +2325,22 @@ // unisolated view - will create if it does not exist. //BTM - PRE_CLIENT_SERVICE - BEGIN //BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(this).getGlobalFileSystem(); +//BTM - PRE_CLIENT_SERVICE } +//BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE // read committed view IFF it exists otherwise [null] +//BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(this).getReadCommitted(); return new GlobalFileSystemHelper(this) .getGlobalFileSystem (concurrencyManager, resourceManager.getDiscoveryManager()); -//BTM - PRE_CLIENT_SERVICE - END } // read committed view IFF it exists otherwise [null] - return new GlobalFileSystemHelper(this).getReadCommitted(); + return new GlobalFileSystemHelper(this) + .getReadCommitted + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } @@ -2711,7 +2718,13 @@ }; - return new GlobalFileSystemHelper(tmp).getReadCommitted(); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return new GlobalFileSystemHelper(tmp).getReadCommitted(); + return new GlobalFileSystemHelper(tmp) + .getReadCommitted + (concurrencyManager, + resourceManager.getDiscoveryManager()); +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/journal/ScaleOutIndexManager.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -276,7 +276,7 @@ tempStoreFactory.closeAll(); } - // Required IIndexManager + // Required by IIndexManager public void registerIndex(IndexMetadata metadata) { registerIndex(metadata, null); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/AbstractResource.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -542,9 +542,18 @@ } - container = getIndexManager() - .getResourceLocator() - .locate(getContainerNamespace(), getTimestamp()); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE container = getIndexManager() +//BTM - PRE_CLIENT_SERVICE .getResourceLocator() +//BTM - PRE_CLIENT_SERVICE .locate(getContainerNamespace(), getTimestamp()); + container = + getIndexManager().getResourceLocator() + .locate( getIndexManager(), + getConcurrencyManager(), + getDiscoveryManager(), + getContainerNamespace(), + getTimestamp() ); +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -52,6 +52,10 @@ import com.bigdata.service.IBigdataFederation; import com.bigdata.sparse.SparseRowStore; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; + /** * Generic implementation relies on a ctor for the resource with the following * method signature: @@ -181,7 +185,15 @@ } // @todo hotspot 2% total query time. - public T locate(final String namespace, final long timestamp) { +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE public T locate(final String namespace, final long timestamp) { + public T locate(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + final String namespace, + final long timestamp) + { +//BTM - FOR_CLIENT_SERVICE - END if (namespace == null) throw new IllegalArgumentException(); @@ -258,7 +270,14 @@ } // pass request to delegate. - resource = delegate.locate(namespace, timestamp); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE resource = delegate.locate(namespace, timestamp); + resource = delegate.locate(indexManager, + concurrencyManager, + discoveryManager, + namespace, + timestamp); +//BTM - FOR_CLIENT_SERVICE - END if (resource != null) { @@ -317,8 +336,17 @@ } // create a new instance of the relation. - resource = newInstance(cls, foundOn.get(), namespace, timestamp, - properties); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE resource = newInstance(cls, foundOn.get(), namespace, timestamp, +//BTM - FOR_CLIENT_SERVICE properties); + resource = newInstance(cls, + foundOn.get(),//indexManager + concurrencyManager, + discoveryManager, + namespace, + timestamp, + properties); +//BTM - FOR_CLIENT_SERVICE - END // Add to the cache. put(resource); @@ -544,9 +572,19 @@ * * @return A new instance of the identifed resource. */ +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE protected T newInstance(final Class<? extends T> cls, +//BTM - FOR_CLIENT_SERVICE final IIndexManager indexManager, final String namespace, +//BTM - FOR_CLIENT_SERVICE final long timestamp, final Properties properties) { protected T newInstance(final Class<? extends T> cls, - final IIndexManager indexManager, final String namespace, - final long timestamp, final Properties properties) { + final IIndexManager indexManager, + final IConcurrencyManager concurrencyManager, + final IBigdataDiscoveryManagement discoveryManager, + final String namespace, + final long timestamp, + final Properties properties) + { +//BTM - FOR_CLIENT_SERVICE - END if (cls == null) throw new IllegalArgumentException(); @@ -563,12 +601,24 @@ final Constructor<? extends T> ctor; try { - ctor = cls.getConstructor(new Class[] {// - IIndexManager.class,// - String.class,// relation namespace - Long.class, // timestamp of the view - Properties.class // configuration properties. - }); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE ctor = cls.getConstructor(new Class[] {// +//BTM - FOR_CLIENT_SERVICE IIndexManager.class,// +//BTM - FOR_CLIENT_SERVICE String.class,// relation namespace +//BTM - FOR_CLIENT_SERVICE Long.class, // timestamp of the view +//BTM - FOR_CLIENT_SERVICE Properties.class // configuration properties. +//BTM - FOR_CLIENT_SERVICE }); + ctor = cls.getConstructor + (new Class[] + { IIndexManager.class, + IConcurrencyManager.class, + IBigdataDiscoveryManagement.class, + String.class, // relation namespace + Long.class, // timestamp of the view + Properties.class // configuration properties. + } + ); +//BTM - FOR_CLIENT_SERVICE - END } catch (Exception e) { @@ -580,12 +630,24 @@ final T r; try { - r = ctor.newInstance(new Object[] {// - indexManager,// - namespace, // - timestamp, // - properties // - }); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE r = ctor.newInstance(new Object[] {// +//BTM - FOR_CLIENT_SERVICE indexManager,// +//BTM - FOR_CLIENT_SERVICE namespace, // +//BTM - FOR_CLIENT_SERVICE timestamp, // +//BTM - FOR_CLIENT_SERVICE properties // +//BTM - FOR_CLIENT_SERVICE }); + r = ctor.newInstance + (new Object[] + { indexManager, + concurrencyManager, + discoveryManager, + namespace, + timestamp, + properties + } + ); +//BTM - FOR_CLIENT_SERVICE - END r.init(); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/IResourceLocator.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -32,6 +32,11 @@ import com.bigdata.relation.IRelation; import com.bigdata.service.IBigdataFederation; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; +import com.bigdata.journal.IIndexManager; + /** * An object that knows how to resolve a resource identifier (aka namespace) to * an {@link ILocatableResource} instance. "Locating" a relation means (a) @@ -68,6 +73,13 @@ * <code>null</code> if the resource declaration could not be * resolved. */ - public T locate(String namespace, long timestamp); +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE public T locate(String namespace, long timestamp); + public T locate(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + String namespace, + long timestamp); +//BTM - FOR_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/locator/ResourceLocatorMap.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -33,6 +33,11 @@ import com.bigdata.relation.IRelation; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; +import com.bigdata.journal.IIndexManager; + /** * A mapping between {@link String}s and {@link IResourceLocator}s. * This can be used to locate local, temporary or virtual relations. @@ -77,7 +82,15 @@ } - public T locate(String relationName, long timestamp) { +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE public T locate(String relationName, long timestamp) { + public T locate(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, + IBigdataDiscoveryManagement discoveryManager, + String relationName, + long timestamp) + { +//BTM - PRE_CLIENT_SERVICE - END if (relationName == null) throw new IllegalArgumentException(); @@ -90,7 +103,14 @@ } - return relationLocator.locate(relationName, timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE return relationLocator.locate(relationName, timestamp); + return relationLocator.locate(indexManager, + concurrencyManager, + discoveryManager, + relationName, + timestamp); +//BTM - PRE_CLIENT_SERVICE - END } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -766,9 +766,18 @@ if (!c.containsKey(relationIdentifier)) { - final IRelation relation = (IRelation) indexManager - .getResourceLocator().locate(relationIdentifier, - timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final IRelation relation = (IRelation) indexManager +//BTM - PRE_CLIENT_SERVICE .getResourceLocator().locate(relationIdentifier, +//BTM - PRE_CLIENT_SERVICE timestamp); + final IRelation relation = + (IRelation) indexManager.getResourceLocator() + .locate(indexManager, + concurrencyManager, + discoveryManager, + relationIdentifier, + timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN c.put(relationIdentifier, relation); @@ -839,9 +848,18 @@ if (!c.containsKey(relationName)) { - final IRelation relation = (IRelation) indexManager - .getResourceLocator().locate(relationName, - timestamp); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final IRelation relation = (IRelation) indexManager +//BTM - PRE_CLIENT_SERVICE .getResourceLocator().locate(relationName, +//BTM - PRE_CLIENT_SERVICE timestamp); + final IRelation relation = + (IRelation) indexManager.getResourceLocator() + .locate(indexManager, + concurrencyManager, + discoveryManager, + relationName, + timestamp); +//BTM - PRE_CLIENT_SERVICE - END c.put(relationName, relation); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -419,13 +419,17 @@ } //false ==> allow in-progress tasks to complete - queueStatsTaskFuture.cancel(false); + if (queueStatsTaskFuture != null) { + queueStatsTaskFuture.cancel(false); + } Util.shutdownExecutorService (scheduledExecutor, timeout, serviceName+".scheduledExecutor", logger); - threadPool.shutdownNow(); + if (threadPool != null) { + threadPool.shutdownNow(); + } //send one last event report (same logic as in AbstractFederation) new EventQueueSenderTask Modified: branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -746,6 +746,7 @@ */ public boolean awaitRunning() { +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","*** StoreManager.awaitRunning >>> isOpen="+isOpen()+", isStarting="+isStarting()); while (isOpen() && isStarting()) { try { @@ -762,8 +763,10 @@ } +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","*** StoreManager.awaitRunning >>> END LOOP: isOpen="+isOpen()+", isStarting="+isStarting()); } +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","*** StoreManager.awaitRunning >>> RETURN isRunning="+isRunning()+"\n\n"); return isRunning(); } @@ -1358,21 +1361,21 @@ try { -System.out.println("\nStoreManager#Startup >>> start()"); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager#Startup >>> start()"); start(); // successful startup -System.out.println("StoreManager#Startup >>> set isStarting to FALSE"); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager#Startup >>> set isStarting to FALSE"); starting.set(false); // Purge any resources that we no longer require. -System.out.println("StoreManager#Startup >>> PURGE old resources during startup"); if(purgeOldResourcesDuringStartup) purgeOldResources(); } catch (Throwable ex) { -System.out.println("StoreManager#Startup >>> EXCEPTION >>> "+ex); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager#Startup >>> EXCEPTION >>> "+ex+"\n"+com.bigdata.util.Util.getThrowableStackTrace(ex)+"\n"); + // avoid possibility that isRunning() could become true. open.set(false); @@ -1392,7 +1395,7 @@ * flag is turned off. */ -System.out.println("StoreManager#Startup >>> FINALLY >>> set isStarting to FALSE"); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager#Startup >>> FINALLY >>> set isStarting to FALSE"); starting.set(false); if (log.isInfoEnabled()) @@ -1430,30 +1433,29 @@ final private void start() throws InterruptedException { if (!isStarting()) { - throw new IllegalStateException(); - } /* * Verify that the concurrency manager has been set and wait a while * it if is not available yet. */ - { - int nwaits = 0; - while (true) { - try { - getConcurrencyManager(); - break; - } catch (IllegalStateException ex) { - Thread.sleep(100/* ms */); - if (++nwaits % 50 == 0) - log.warn("Waiting for concurrency manager"); - } - } - } + {//begin block + int nwaits = 0; + while (true) { + try { + getConcurrencyManager(); + break; + } catch (IllegalStateException ex) { + Thread.sleep(100/* ms */); + if (++nwaits % 50 == 0) + log.warn("Waiting for concurrency manager"); + } + }//end loop + }//end block - try { +//BTM - PRE_CLIENT_SERVICE try { +//BTM - PRE_CLIENT_SERVICE //BTM - BEGIN - PRE_CLIENT_SERVICE //BTM - PRE_CLIENT_SERVICE final IBigdataFederation<?> fed = getFederation(); //BTM - PRE_CLIENT_SERVICE if (fed == null) { @@ -1496,65 +1498,66 @@ //BTM - PRE_CLIENT_SERVICE log.warn("\n"+stackTrace+"\n"); //BTM - PRE_CLIENT_SERVICE} //BTM - PRE_CLIENT_SERVICE +//BTM - PRE_CLIENT_SERVICE } catch (UnsupportedOperationException ex) { +//BTM - PRE_CLIENT_SERVICE log.warn("Federation not available - running in test case?"); +//BTM - PRE_CLIENT_SERVICE } //BTM - maintain original logic for now - final IBigdataDiscoveryManagement discoveryMgr = + try { + final IBigdataDiscoveryManagement discoveryMgr = getDiscoveryManager(); - if (discoveryMgr == null) { - // Some of the unit tests do not start - // the txs until after the shard service. - // For those tests getDiscoveryManager() - // will return null during startup() of - // the shard service. To have a common - // code path, an exception is thrown - // here, but caught below. - throw new UnsupportedOperationException(); - } - // Wait no more than N seconds for discovery - int nWait = 30; - boolean discoveredTxnSrvc = false; - for(int i=0; i<nWait; i++) { - if (discoveryMgr.getTransactionService() - != null) - { - discoveredTxnSrvc = true; - break; - } - try { - Thread.sleep(1000L); - } catch(InterruptedException ie) { } - if (log.isDebugEnabled()) { - log.debug - ("waiting for transaction " - +"service discovery"); - } - } - if(discoveredTxnSrvc) { - if (log.isDebugEnabled()) { - log.debug - ("discovered transaction " - +"service"); - } - } else { - log.warn("transaction service " - +"unreachable"); -StackTraceElement[] e = (Thread.currentThread()).getStackTrace(); -StringBuffer buf = new StringBuffer(" "+(e[0]).toString()+"\n"); -for(int i=1;i<e.length;i++) { - buf.append(" "+(e[i]).toString()+"\n"); -} -String stackTrace = buf.toString(); -log.warn("\n"+stackTrace+"\n"); - }//endif(discoveredTxnSrvc) +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> DISCOVERY MGR = "+discoveryMgr+"\n"); + if (discoveryMgr == null) { + // Some of the unit tests do not start + // the txs until after the shard service. + // For those tests getDiscoveryManager() + // will return null during startup() of + // the shard service. To have a common + // code path, an exception is thrown + // here, but caught below. + + throw new UnsupportedOperationException + ("null discoveryMgr"); + } + + // Wait no more than N seconds for discovery + int nWait = 120; + boolean discoveredTxnSrvc = false; + for(int i=0; i<nWait; i++) { + if (discoveryMgr.getTransactionService() != null) { + discoveredTxnSrvc = true; + break; + } + try { + Thread.sleep(1000L); + } catch(InterruptedException ie) { } + + if (log.isDebugEnabled()) { + log.debug("waiting for transaction " + +"service discovery"); + } + if(discoveredTxnSrvc) { + if (log.isDebugEnabled()) { + log.debug("discovered transaction service"); + } + } else { + log.warn("transaction service unreachable"); + }//endif(discoveredTxnSrvc) + }//endloop(nWait) +//BTM if(discoveredTxnSrvc) { +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> TRANSACTION SERVICE DISCOVERED"); +//BTM }else{ +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> TRANSACTION SERVICE UNREACHABLE\n"); +//BTM } + } catch (UnsupportedOperationException ex) { +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> FEDERATION UNAVAILABLE - test case?\n"); + log.warn("Federation not available - running in test case?"); + } //BTM - END - PRE_CLIENT_SERVICE - } catch (UnsupportedOperationException ex) { - log.warn("Federation not available - running in test case?"); - } - - /* - * Look for pre-existing data files. - */ + /* + * Look for pre-existing data files. + */ if (!isTransient) { if (log.isInfoEnabled()) @@ -1562,7 +1565,9 @@ final Stats stats = new Stats(); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> scanDataDirectory [dataDir="+dataDir+"]"); scanDataDirectory(dataDir, stats); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.start >>> SCAN DONE [dataDir="+dataDir+", stats="+stats+"]"); final int nbad = stats.badFiles.size(); @@ -2122,7 +2127,7 @@ private void scanDataDirectory(File dir, Stats stats) throws InterruptedException { -System.out.println("\nStoreManager.scanDataDirectory >>> dir="+dir); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.scanDataDirectory >>> [dataDir="+dir+"]"); if (dir == null) throw new IllegalArgumentException(); @@ -2138,11 +2143,12 @@ if (file.isDirectory()) { -System.out.println("\nStoreManager.scanDataDirectory >>> dir="+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanDataDirectory >>> PROCESSING DIRECTORY [dir="+file+"]"); scanDataDirectory(file, stats); } else { -System.out.println("\nStoreManager.scanDataDirectory >>> scanFile: "+file); + +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanDataDirectory >>> scanFile [file="+file+"]"); scanFile(file, stats); } @@ -2152,7 +2158,6 @@ } private void scanFile(File file, Stats stats) throws InterruptedException { -System.out.println("\nStoreManager.scanFile >>> "+file); if (Thread.interrupted()) throw new InterruptedException(); @@ -2167,6 +2172,7 @@ // #of bytes in the file as reported by the OS. final long len = file.length(); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","\nStoreManager.scanFile >>> [file="+file+", length="+len+"]"); if (len > 0 && name.endsWith(Options.JNL)) { @@ -2182,7 +2188,7 @@ try { -System.out.println("\nStoreManager.scanFile >>> NEW ManagedJournal: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> JNL >>> NEW ManagedJournal"); tmp = new ManagedJournal(properties); } catch (Exception ex) { @@ -2194,6 +2200,7 @@ stats.badFiles.add(file.getAbsolutePath()); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> JNL >>> EXCEPTION - "+ex+"\n"+com.bigdata.util.Util.getThrowableStackTrace(ex)+"\n"); return; } @@ -2222,7 +2229,7 @@ final IndexSegmentStore segStore; try { -System.out.println("\nStoreManager.scanFile >>> NEW IndexSegmentStore: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> SEG >>> NEW IndexSegmentStore("+file+")"); segStore = new IndexSegmentStore(file); } catch (Exception ex) { @@ -2234,6 +2241,7 @@ stats.badFiles.add(file.getAbsolutePath()); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> SEG >>> EXCEPTION - "+ex+"\n"+com.bigdata.util.Util.getThrowableStackTrace(ex)+"\n"); return; } @@ -2270,7 +2278,7 @@ && (name.endsWith(Options.JNL) || name .endsWith(Options.SEG))) { -System.out.println("\nStoreManager.scanFile >>> Ignoring empty file: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> Ignoring empty file: "+file); log.warn("Ignoring empty file: " + file); } else { @@ -2279,7 +2287,7 @@ * This file is not relevant to the resource manager. */ -System.out.println("\nStoreManager.scanFile >>> Ignoring irrelevant file: "+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> Ignoring irrelevant file: "+file); log.warn("Ignoring file: " + file); } @@ -2305,7 +2313,7 @@ // } // addResource(resource, file.getAbsoluteFile()); -System.out.println("\nStoreManager.scanFile >>> addResource: file="+file); +//BTM com.bigdata.util.Util.printStr("TestBigdataClientRemote.txt","StoreManager.scanFile >>> addResource: file="+file); addResource(resource, file); } @@ -3349,7 +3357,7 @@ this.releaseTime = txService.getReleaseTime(); //BTM -log.warn("\n*** StoreManager.purgeOldResources: this.releaseTime="+this.releaseTime+"\n"); +log.warn("*** StoreManager.purgeOldResources: this.releaseTime="+this.releaseTime); } else { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractFederation.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -89,6 +89,7 @@ import com.bigdata.journal.ConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.LocalTransactionManager; +import java.io.File; /** * Abstract base class for {@link IBigdataFederation} implementations. @@ -113,6 +114,7 @@ //BTM - FOR_CLIENT_SERVICE - BEGIN protected ResourceManager fedResourceMgr; protected IConcurrencyManager fedConcurrencyMgr; +private static int dataDirCounter = 0; //BTM - FOR_CLIENT_SERVICE - END /** @@ -674,16 +676,50 @@ // tempStoreFactory = new TemporaryStoreFactory(this.client // .getTempStoreMaxExtent()); -//BTM - FOR_CLIENT_SERVICE - BEGIN - for passing concurrencyMgr to getGlobalFileSystem and TemporaryStoreFactory.getTempStore +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE - NOTE: getGlobalFileSystem and TemporaryStoreFactory.getTempStore +//BTM - FOR_CLIENT_SERVICE - now expect a ConcurrencyManager to be passed into them. +//BTM - FOR_CLIENT_SERVICE - Thus, the code below was added to provide a ConcurrencyManager +//BTM - FOR_CLIENT_SERVICE - that this AbstractFederation can input when those methods +//BTM - FOR_CLIENT_SERVICE - are invoked. It is not expected that the ConcurrencyManager +//BTM - FOR_CLIENT_SERVICE - created here will be employed in other places in the +//BTM - FOR_CLIENT_SERVICE - code. Note also that in order to create a ConcurrencyManager, +//BTM - FOR_CLIENT_SERVICE - a ResourceManager must first be created and, that ResourceManager +//BTM - FOR_CLIENT_SERVICE - expects the properties that are input to its constructor +//BTM - FOR_CLIENT_SERVICE - will include a valid value for the property named, +//BTM - FOR_CLIENT_SERVICE - "com.bigdata.resources.StoreManager.dataDir". Without this +//BTM - FOR_CLIENT_SERVICE - property, the StoreManager on which the ResourceManager +//BTM - FOR_CLIENT_SERVICE - is based will fail to start. Additionally, it is very +//BTM - FOR_CLIENT_SERVICE - important that the value to which that property is set +//BTM - FOR_CLIENT_SERVICE - be unique for each instance of the ResourceManager/StoreManager +//BTM - FOR_CLIENT_SERVICE - that is created; otherwise, the StoreManager will again +//BTM - FOR_CLIENT_SERVICE - fail to start, and will ultimately throw an +//BTM - FOR_CLIENT_SERVICE - OverlappingFileLockException. Thus, to satisfy this +//BTM - FOR_CLIENT_SERVICE - requirement, the static variable named, 'dataDirCounter' +//BTM - FOR_CLIENT_SERVICE - is declared and incremented for each instantiation of +//BTM - FOR_CLIENT_SERVICE - this class; which provides a unique token that is used +//BTM - FOR_CLIENT_SERVICE - to modify the value of the system property relative to +//BTM - FOR_CLIENT_SERVICE - the other instances of the ResourceManager that are +//BTM - FOR_CLIENT_SERVICE - are created. + Properties resourceMgrProps = (Properties) (client.getProperties()).clone(); + resourceMgrProps.setProperty + ("com.bigdata.resources.StoreManager.dataDir", + System.getProperty("java.io.tmpdir") + +File.separator + +(FedResourceManager.class).getName() + +File.separator + +"StoreManager" + +File.separator + +"dataDir_"+(dataDirCounter++)); this.fedResourceMgr = new FedResourceManager ( (IBigdataDiscoveryManagement)this, (ILocalResourceManagement)this, (IIndexManager)this, - client.getProperties() ); + resourceMgrProps ); this.fedConcurrencyMgr = new ConcurrencyManager - (client.getProperties(), + (resourceMgrProps, new LocalTransactionManager ( (IBigdataDiscoveryManagement)this ), this.fedResourceMgr); @@ -798,7 +834,6 @@ assertOpen(); try { - UUID indexUUID = getMetadataService().registerScaleOutIndex( metadata, separatorKeys, dataServiceUUIDs); @@ -839,22 +874,18 @@ public void dropIndex(String name) { -String dbgFlnm = "TestEmbeddedClient.txt"; if (log.isInfoEnabled()) log.info("name=" + name); assertOpen(); -com.bigdata.util.Util.printStr(dbgFlnm, " AbstractFederation.dropIndex[name="+name+"] - assertOpen = OK"); try { -com.bigdata.util.Util.printStr(dbgFlnm, " AbstractFederation.dropIndex - metadataService = "+getMetadataService()); getMetadataService().dropScaleOutIndex(name); if (log.isInfoEnabled()) log.info("dropped scale-out index."); -com.bigdata.util.Util.printStr(dbgFlnm, " AbstractFederation.dropIndex - getIndexCache = "+getIndexCache()); getIndexCache().dropIndexFromCache(name); } catch (Exception e) { @@ -1752,7 +1783,7 @@ this.shutdown(); } - // For tests + // For tasks started by ClientService and for tests public IConcurrencyManager getConcurrencyManager() { return fedConcurrencyMgr; Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractIndexCache.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -105,14 +105,16 @@ final NT nt = new NT(name, timestamp); // test cache before synchronization. +log.warn("\nAbstractIndexCache.getIndex >>> 1a. ndx = indexCache.get("+nt+")\n"); T ndx = indexCache.get(nt); if (ndx != null) { +log.warn("\nAbstractIndexCache.getIndex >>> 1b. NOT NULL - ndx = "+ndx+")\n"); return ndx; } -log.warn("\nAbstractIndexCache.constructor >>> 1. ndx == NULL\n"); +log.warn("\nAbstractIndexCache.getIndex >>> 1b. ndx == NULL\n"); /* * Acquire a lock for the index name and timestamp. This allows @@ -123,25 +125,29 @@ try { +log.warn("\nAbstractIndexCache.getIndex >>> 2a. ndx == indexCache.get("+nt+")\n"); ndx = indexCache.get(nt); if (ndx == null) { -log.warn("\nAbstractIndexCache.constructor >>> 2. ndx == NULL\n"); +log.warn("\nAbstractIndexCache.getIndex >>> 2b. ndx == NULL\n"); +log.warn("\nAbstractIndexCache.getIndex >>> 3a. ndx = newView("+nt+")\n"); if ((ndx = newView(name, timestamp)) == null) { -log.warn("\nAbstractIndexCache.constructor >>> 3. newView -- indx == NULL\n"); if (INFO) log.info("name=" + name + " @ " + timestamp + " : no such index."); +log.warn("\nAbstractIndexCache.getIndex >>> 3b. newView("+nt+") = NULL >>> RETURN\n"); return null; - } + }//(newView == null) +log.warn("\nAbstractIndexCache.getIndex >>> 3b. NOT NULL - newView("+nt+") = "+ndx+"\n"); // add to the cache. // indexCache.put(nt, ndx, false/* dirty */); indexCache.put(nt, ndx); +log.warn("\nAbstractIndexCache.getIndex >>> 3c. indexCache.put("+nt+", "+ndx+")\n"); if (INFO) log.info("name=" + name + " @ " @@ -153,8 +159,9 @@ log.info("name=" + name + " @ " + timestamp + " : cache hit."); - } + }//endif(ndx == null) +log.warn("\nAbstractIndexCache.getIndex >>> 4. FINAL RETURN - ndx = "+ndx+"\n"); return ndx; } finally { @@ -191,8 +198,11 @@ final Map.Entry<NT, WeakReference<T>> entry = itr.next(); final T ndx = entry.getValue().get(); +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 1a. entry.getValue().get() = "+ndx+"\n"); + if(ndx == null) { +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 1b. ndx = NULL >>> NEXT ndx\n"); /* * The entry under the key has been cleared so we just skip @@ -220,16 +230,21 @@ + name + " @ " + timestamp); // remove from the cache. - indexCache.remove(entry.getKey()); +//BTM indexCache.remove(entry.getKey()); +Object retVal = indexCache.remove(entry.getKey()); +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 2. indexCache.remove("+entry.getKey()+") >>> DROPPED [KEY="+entry.getKey()+", VAL="+retVal+"]\n"); - } + }//endif(timestamp == ITx.UNISOLATED || ITx.READ_COMMITTED) - } + }//endif(name.equals(nt.getName) - } + }//end loop - } +log.warn("\nAbstractIndexCache.dropIndexFromCache >>> 3a. VERIFY - getIndex("+name+", ITx.READ_COMMITTED) = "+getIndex(name, ITx.READ_COMMITTED)+"]"); +log.warn("AbstractIndexCache.dropIndexFromCache >>> 3b. VERIFY - getIndex("+name+", ITx.UNISOLATED) = "+getIndex(name, ITx.UNISOLATED)+"]\n"); + }//end sync(indexCache) + } protected void shutdown() { Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -269,14 +269,14 @@ assertOpen(); -//BTM return getMetadataIndexCache().getIndex(name, timestamp); + return getMetadataIndexCache().getIndex(name, timestamp); -MetadataIndexCache cache = getMetadataIndexCache(); -IMetadataIndex index = cache.getIndex(name, timestamp); -log.warn("\n>>>>AbstractScaleOutFederation.getMetadataIndex: name="+name+", timestamp="+timestamp+", metadataIndexCache="+cache+", metadataIndex="+index+"\n"); -return index; +//BTM - PRE_CLIENT_SERVICE MetadataIndexCache cache = getMetadataIndexCache(); +//BTM - PRE_CLIENT_SERVICE IMetadataIndex index = cache.getIndex(name, timestamp); +//BTM - PRE_CLIENT_SERVICE log.warn("\n>>>>AbstractScaleOutFederation.getMetadataIndex: name="+name+", timestamp="+timestamp+", metadataIndexCache="+cache+", metadataIndex="+index+"\n"); +//BTM - PRE_CLIENT_SERVICE return index; } - + /** * Returns an iterator that will visit the {@link PartitionLocator}s for * the specified scale-out index key range. @@ -477,8 +477,7 @@ int ntries = 0; // updated each time through the loop. -//BTM IMetadataService metadataService = null; -ShardLocator metadataService = null; + ShardLocator metadataService = null; // updated each time through the loop. UUID[] dataServiceUUIDs = null; @@ -646,6 +645,20 @@ } +//BTM - FOR_CLIENT_SERVICE - BEGIN ----------------------------------------------------------- +//BTM - FOR_CLIENT_SERVICE - NOTE: this method was added to address trac issue #190 + public void dropIndex(String name) { + super.dropIndex(name); + try { + getMetadataIndexCache().dropIndexFromCache(name); + getIndexCache().dropIndexFromCache(name); + } catch (Exception e) {//maintain same logic as super.dropIndex? + throw new RuntimeException( e ); + } + } +//BTM - FOR_CLIENT_SERVICE - END ------------------------------------------------------------- + + //BTM - PRE_CLIENT_SERVICE - BEGIN - moved to standalone classes ------------------------------------------------------------------- //BTM - PRE_CLIENT_SERVICE /** //BTM - PRE_CLIENT_SERVICE //BTM - PRE_CLIENT_SERVICE * Task directs a {@link ShardService} to purge any unused resources and to Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientService.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -37,6 +37,7 @@ import com.bigdata.Banner; //BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; import com.bigdata.journal.IIndexManager; import com.bigdata.service.jini.JiniFederation; import com.bigdata.resources.ILocalResourceManagement; @@ -184,7 +185,9 @@ String zkRoot = fed.getZooConfig().zroot; return getFederation().getExecutorService().submit ( new ClientTaskWrapper( (IIndexManager)fed, + fed.getConcurrencyManager(), (ILocalResourceManagement)fed, + (IBigdataDiscoveryManagement)fed, this,//embeddedCallableExecutor task, zkClient, zkAcl, zkRoot) ); Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/ClientTaskWrapper.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -24,6 +24,8 @@ package com.bigdata.service; +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.resources.ILocalResourceManagement; @@ -37,7 +39,9 @@ public class ClientTaskWrapper<T> implements Callable<T> { private IIndexManager indexMgr; + private IConcurrencyManager concurrencyMgr; private ILocalResourceManagement localResourceMgr; + private IBigdataDiscoveryManagement discoveryMgr; private CallableExecutor embeddedCallableExecutor; private IClientServiceCallable<T> task; private ZooKeeper zkClient; @@ -45,7 +49,9 @@ private String zkRoot; public ClientTaskWrapper(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, ILocalResourceManagement localResourceManager, + IBigdataDiscoveryManagement discoveryManager, CallableExecutor embeddedCallableExecutor, IClientServiceCallable<T> task, ZooKeeper zookeeperClient, @@ -53,7 +59,9 @@ String zookeeperRoot) { this.indexMgr = indexManager; + this.concurrencyMgr = concurrencyManager; this.localResourceMgr = localResourceManager; + this.discoveryMgr = discoveryManager; this.embeddedCallableExecutor = embeddedCallableExecutor; this.task = task; this.zkClient = zookeeperClient; @@ -63,7 +71,8 @@ public T call() throws Exception { return task.startClientTask - (indexMgr, localResourceMgr, + (indexMgr, concurrencyMgr, + localResourceMgr, discoveryMgr, embeddedCallableExecutor, zkClient, zkAcl, zkRoot); } Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java 2010-11-08 21:31:17 UTC (rev 3918) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/IClientServiceCallable.java 2010-11-08 23:56:01 UTC (rev 3919) @@ -17,6 +17,8 @@ package com.bigdata.service; +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.resources.ILocalResourceManagement; @@ -50,7 +52,9 @@ * @throws Exception if unable to compute a result */ V startClientTask(IIndexManager indexManager, + IConcurrencyManager concurrencyManager, ILocalResourceManagement localResourceManager, + IBigdataDiscoveryManagement discoveryManager, CallableExecutor embeddedCallableExecutor, ZooKeeper zookeeperClient, List<ACL> zookeeperAcl, Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/IndexCache.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/big... [truncated message content] |
From: <btm...@us...> - 2010-11-10 00:40:36
|
Revision: 3923 http://bigdata.svn.sourceforge.net/bigdata/?rev=3923&view=rev Author: btmurphy Date: 2010-11-10 00:40:29 +0000 (Wed, 10 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - phase 1 of callable executor (client service) smart proxy work. Includes fixes to issues identified by the rdf tests Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -317,7 +317,7 @@ * until existing transactions (both read-write and read-only) are complete * (either aborted or committed). */ - public void shutdown() { + public void shutdown0() { if(log.isInfoEnabled()) log.info(""); @@ -338,7 +338,8 @@ try { // wait for running transactions to complete. - awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS); +//BTM - FOR_CLIENT_SERVICE awaitRunningTx(10/* logTimeout */, TimeUnit.MILLISECONDS); +awaitRunningTx(10L*1000L/* logTimeout */, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/QuorumPeerStateV0.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -55,7 +55,7 @@ private int initLimit = 5; private int syncLimit = 2; private int electionAlg = 3;//0=udp, 3=tcp - private int maxClientCnxns = 10; + private int maxClientCnxns = 0;//0 ==> unlimited, 10 is default private Map<Long, QuorumPeerData> peerDataMap = new TreeMap<Long, QuorumPeerData>();//order by peerId Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -740,12 +740,31 @@ final ITextIndexer tmp; try { final Class<?> vfc = determineTextIndexerClass(); - final Method gi = vfc.getMethod("getInstance", - IIndexManager.class, String.class, Long.class, - Properties.class); - tmp = (ITextIndexer) gi.invoke(null/* object */, - getIndexManager(), getNamespace(), - getTimestamp(), getProperties()); +//BTM - PRE_CLIENT_SERVICE - BEGIN +//BTM - PRE_CLIENT_SERVICE final Method gi = vfc.getMethod("getInstance", +//BTM - PRE_CLIENT_SERVICE IIndexManager.class, String.class, Long.class, +//BTM - PRE_CLIENT_SERVICE Properties.class); +//BTM - PRE_CLIENT_SERVICE tmp = (ITextIndexer) gi.invoke(null/* object */, +//BTM - PRE_CLIENT_SERVICE getIndexManager(), getNamespace(), +//BTM - PRE_CLIENT_SERVICE getTimestamp(), getProperties()); + final Method gi = + vfc.getMethod("getInstance", + IIndexManager.class, + IConcurrencyManager.class, + IBigdataDiscoveryManagement.class, + String.class, + Long.class, + Properties.class); + tmp = + (ITextIndexer) gi.invoke + (null,//object + getIndexManager(), + getConcurrencyManager(), + getDiscoveryManager(), + getNamespace(), + getTimestamp(), + getProperties()); +//BTM - PRE_CLIENT_SERVICE - END if(tmp instanceof ILocatableResource<?>) { ((ILocatableResource<?>)tmp).init(); } Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -502,12 +502,7 @@ if (indexManager == null) throw new IllegalArgumentException(); //BTM - FOR_CLIENT_SERVICE - BEGIN - if (concurrencyManager == null) { - throw new IllegalArgumentException("null concurrencyManager"); - } - if (discoveryManager == null) { - throw new IllegalArgumentException("null discoveryManager"); - } + //allowed to be null this.concurrencyManager = concurrencyManager; this.discoveryManager = discoveryManager; //BTM - FOR_CLIENT_SERVICE - END Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/LocalTripleStore.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -36,6 +36,10 @@ import com.bigdata.rdf.spo.SPORelation; import com.bigdata.relation.locator.DefaultResourceLocator; +//BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; + /** * A triple store based on the <em>bigdata</em> architecture. This class * offers extremely low latency for index operations. All indices are local @@ -152,7 +156,7 @@ //BTM - PRE_CLIENT_SERVICE - BEGIN //BTM - PRE_CLIENT_SERVICE - NOTE: the super class (AbstractLocalTripleStore --> AbstractTripleStore) now takes an //BTM - PRE_CLIENT_SERVICE - IConcurrencyManager and an IBigdataDiscoveryManagment instance for scale out. -//BTM - PRE_CLIENT_SERVICE - It's not clear whether passing null for those parameters will be a problem +//BTM - PRE_CLIENT_SERVICE - It's not clear whether passing null for those parameters will be a problem or //BTM - PRE_CLIENT_SERVICE - not. Need to monitor the tests for NullPointerExceptions. //BTM - PRE_CLIENT_SERVICE //BTM - PRE_CLIENT_SERVICE super(indexManager, namespace, timestamp, properties); @@ -166,6 +170,31 @@ } +//BTM - FOR_CLIENT_SERVICE - BEGIN +//BTM - FOR_CLIENT_SERVICE - NOTE: DefaultResourceLocator now uses reflection +//BTM - FOR_CLIENT_SERVICE - to invoke a constructor with the arguments +//BTM - FOR_CLIENT_SERVICE - specified below, rather than the original +//BTM - FOR_CLIENT_SERVICE - constructor shown above. This constructor +//BTM - FOR_CLIENT_SERVICE - was added to allow for that reflection case. + public LocalTripleStore + (final IIndexManager indexManager, + final IConcurrencyManager concurrencyManager, + final IBigdataDiscoveryManagement discoveryManager, + final String namespace, + final Long timestamp, + final Properties properties) + { + super(indexManager, + concurrencyManager, + discoveryManager, + namespace, + timestamp, + properties); + + store = (Journal) indexManager; + } +//BTM - FOR_CLIENT_SERVICE - END + /** * Create or re-open a triple store using a local embedded database. */ Modified: branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-11-09 14:16:13 UTC (rev 3922) +++ branches/dev-btm/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-11-10 00:40:29 UTC (rev 3923) @@ -163,6 +163,8 @@ import com.bigdata.journal.TransactionService; //BTM - FOR_CLIENT_SERVICE +import com.bigdata.discovery.IBigdataDiscoveryManagement; +import com.bigdata.journal.IConcurrencyManager; import com.bigdata.relation.locator.IResourceLocator; /** * <p> @@ -1224,6 +1226,13 @@ .getTransactionManager().getTransactionService(); final String namespace = database.getNamespace(); + +//BTM - FOR_CLIENT_SERVICE - BEGIN + final IConcurrencyManager concurrencyManager = + database.getConcurrencyManager(); + final IBigdataDiscoveryManagement discoveryManager = + database.getDiscoveryManager(); +//BTM - FOR_CLIENT_SERVICE - END final Lock readLock = lock.readLock(); readLock.lock(); @@ -1255,11 +1264,12 @@ //BTM - FOR_CLIENT_SERVICE final AbstractTripleStore txView = (AbstractTripleStore) indexManager //BTM - FOR_CLIENT_SERVICE .getResourceLocator().locate(namespace, tx); IResourceLocator locator = indexManager.getResourceLocator(); +log.warn("\n*** concurrencyManager = "+concurrencyManager+", discoveryManager = "+discoveryManager+"\n"); final AbstractTripleStore txView = (AbstractTripleStore) locator.locate (indexManager, - database.getConcurrencyManager(), - database.getDiscoveryManager(), + concurrencyManager, + discoveryManager, namespace, tx); //BTM - FOR_CLIENT_SERVICE - END This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-11-24 21:40:14
|
Revision: 3984 http://bigdata.svn.sourceforge.net/bigdata/?rev=3984&view=rev Author: btmurphy Date: 2010-11-24 21:40:07 +0000 (Wed, 24 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - modified shard, shardlocator, and executor ServiceImpl and related config files to use QuorumPeerManager to initialize the ZooKeeperAccessor utility instead of using only the 'servers' in the config file Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config Modified: branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata/src/java/com/bigdata/resources/LocalResourceManager.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -331,6 +331,8 @@ + ICounterSet.pathSeparator + IResourceManagerCounters.IndexManager ); +//BTM - FIX NullPointerException - BEGIN +if (tmp3 != null) { synchronized (tmp3) { // Note: detach and then attach since that wipes out @@ -355,6 +357,8 @@ } } }//end live indices +}//endif(tmp3 != null) +//BTM - FIX NullPointerException - BEGIN lastReattachMillis = now; Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/Constants.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -121,4 +121,13 @@ Boolean.FALSE; // Boolean.parseBoolean // (IBigdataClient.Options.DEFAULT_COLLECT_PLATFORM_STATISTICS); + + // ZooKeeper client session timeout in seconds. Note that for the + // typical tick time of 2 seconds per tick, the session timeout + // should/will be set to a value between 4 and 40 seconds; because + // ZooKeeper requires that the session timeout always fall between + // 2 and 20 ticks. + int LOWER_BOUND_ZK_SESSION_TIMEOUT = 1; + int UPPER_BOUND_ZK_SESSION_TIMEOUT = Integer.MAX_VALUE; + int DEFAULT_UPPER_BOUND_ZK_SESSION_TIMEOUT = 40; } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -29,6 +29,7 @@ import com.bigdata.attr.ServiceInfo; import com.bigdata.io.SerializerUtil; +import com.bigdata.jini.quorum.QuorumPeerManager; import com.bigdata.jini.start.BigdataZooDefs; import com.bigdata.jini.util.ConfigMath; import com.bigdata.service.IClientServiceCallable; @@ -139,8 +140,6 @@ private EmbeddedCallableExecutor embeddedCallableExecutor; -//BTM private Thread waitThread; - /* Constructor used by Service Starter Framework to start this service */ public ServiceImpl(String[] args, LifeCycle lifeCycle) throws Exception { @@ -310,11 +309,6 @@ serviceId = bootStateUtil.getServiceId(); logger.debug("smsProxyId = null - service generated & persisted " +"(or retreieved) its own proxy id ["+proxyId+"]"); - - setZookeeperConfigInfo(config); - zookeeperAccessor = - new ZooKeeperAccessor - (zookeeperServers, zookeeperSessionTimeout); } else {//ServicesConfiguration pre-generated the proxy id proxyId = smsProxyId; serviceId = com.bigdata.jini.util.JiniUtil.uuid2ServiceID(proxyId); @@ -362,7 +356,8 @@ //properties object for the EmbeddedCallableExecutor Properties props = new Properties(); - props.setProperty("com.bigdata.resources.StoreManager.dataDir", dataDir); + props.setProperty + ("com.bigdata.resources.StoreManager.dataDir", dataDir); int threadPoolSize = Config.getIntEntry(config, COMPONENT_NAME, "threadPoolSize", @@ -462,6 +457,12 @@ Boolean.FALSE); this.sdm = new ServiceDiscoveryManager(ldm, null, config); + if (zookeeperAccessor == null) { + setZookeeperConfigInfo(config, this.sdm); + zookeeperAccessor = + new ZooKeeperAccessor + (zookeeperServers, zookeeperSessionTimeout); + } embeddedCallableExecutor = new EmbeddedCallableExecutor @@ -505,9 +506,6 @@ +", locators=" +Util.writeArrayElementsToString(locatorsToJoin)); -//BTM waitThread = new Util.WaitOnInterruptThread(logger); -//BTM waitThread.start(); - readyState.ready();//ready to accept calls from clients } @@ -566,13 +564,14 @@ futureExporters.removeAll(removeSet); } -//BTM waitThread.interrupt(); -//BTM try { -//BTM waitThread.join(); -//BTM } catch (InterruptedException e) {/*exiting, so swallow*/} - + if (zookeeperAccessor != null) { + try { + zookeeperAccessor.close(); + } catch(InterruptedException e) {//swallow + } + } Util.cleanupOnExit - (innerProxy, serverExporter, futureExporters, joinMgr, sdm, ldm); + (innerProxy, serverExporter, futureExporters, joinMgr, sdm, ldm); // Tell the ServiceStarter framework it's ok to release for gc if(lifeCycle != null) { @@ -756,7 +755,22 @@ logger.debug("[main]: smsProxyId="+smsProxyId); } - setZookeeperConfigInfo(smsConfig); + String[] tmpGroups = + (String[])smsConfig.getEntry + ("com.bigdata.service.jini.JiniClient", "groups", + String[].class, DiscoveryGroupManagement.NO_GROUPS); + LookupLocator[] tmpLocs = + (LookupLocator[])smsConfig.getEntry + ("com.bigdata.service.jini.JiniClient", "locators", + LookupLocator[].class, new LookupLocator[]{ }); + DiscoveryManagement tmpLdm = + new LookupDiscoveryManager(tmpGroups, tmpLocs, null); + ServiceDiscoveryManager tmpSdm = + new ServiceDiscoveryManager(tmpLdm, null); + + setZookeeperConfigInfo(smsConfig, tmpSdm); + tmpLdm.terminate(); + tmpSdm.terminate(); zookeeperAccessor = new ZooKeeperAccessor (zookeeperServers, zookeeperSessionTimeout); @@ -779,7 +793,8 @@ ("[main]: logicalServiceZPath="+logicalServiceZPath); if(physicalServiceZPath != null) { byte[] data = SerializerUtil.serialize(smsEntries); - ZooKeeper zookeeperClient = zookeeperAccessor.getZookeeper(); + ZooKeeper zookeeperClient = + zookeeperAccessor.getZookeeper(); logger.debug("[main]: zookeeper client created"); try { zookeeperClient.create @@ -789,7 +804,7 @@ +"[physicalServiceZPath=" +physicalServiceZPath+"]"); } catch(NodeExistsException e) { - zookeeperClient.setData(physicalServiceZPath, data, -1); + zookeeperClient.setData(physicalServiceZPath,data,-1); logger.debug("[main]: zookeeper znode updated " +"[physicalServiceZPath=" +physicalServiceZPath+"]"); @@ -810,8 +825,10 @@ } } - private static void setZookeeperConfigInfo(Configuration zkConfig) - throws ConfigurationException + private static void setZookeeperConfigInfo + (Configuration zkConfig, + ServiceDiscoveryManager srvcDiscMgr) + throws ConfigurationException, IOException { String zkComponent = "org.apache.zookeeper.ZooKeeper"; @@ -823,18 +840,11 @@ } logger.debug("zookeepeRoot="+zookeeperRoot); - zookeeperServers = - (String)zkConfig.getEntry - (zkComponent, "servers", String.class, null); - if(zookeeperServers == null) { - throw new ConfigurationException - ("zookeeper servers not specified"); - } - logger.debug("zookeeperServers="+zookeeperServers); - - zookeeperSessionTimeout = - (Integer)zkConfig.getEntry - (zkComponent, "sessionTimeout", int.class, 300000); + zookeeperSessionTimeout = + Config.getIntEntry(zkConfig, zkComponent, "sessionTimeout", + DEFAULT_UPPER_BOUND_ZK_SESSION_TIMEOUT, + LOWER_BOUND_ZK_SESSION_TIMEOUT, + UPPER_BOUND_ZK_SESSION_TIMEOUT); logger.debug("zookeeperSessionTimeout="+zookeeperSessionTimeout); ACL[] acl = (ACL[])zkConfig.getEntry @@ -844,5 +854,22 @@ } zookeeperAcl = Arrays.asList(acl); logger.debug("zookeeperAcl="+zookeeperAcl); + +//BTM - if config contains "servers" then by-pass dynamic discovery for now + zookeeperServers = + (String)zkConfig.getEntry + (zkComponent, "servers", String.class, null); + if(zookeeperServers == null) { + QuorumPeerManager tmpPeerMgr = + new QuorumPeerManager + (srvcDiscMgr, zookeeperSessionTimeout, logger); + if (tmpPeerMgr == null) { + throw new IOException("zookeeper ensemble unavailable"); + } + ZooKeeper.States zkState = tmpPeerMgr.getState(); + logger.debug("zookeeper state="+zkState); + zookeeperServers = tmpPeerMgr.getConnectString(); + } + logger.debug("zookeeperServers="+zookeeperServers); } } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/executor.config 2010-11-24 21:40:07 UTC (rev 3984) @@ -111,13 +111,10 @@ com.bigdata.executor.serverILFactory, false, false); } -//NOTE: remove once dynamic discovery of zookeeper is added org.apache.zookeeper.ZooKeeper { zroot = ConfigDeployUtil.getString("federation.name"); - servers = com.bigdata.executor.serverExporterIpAddr+":2888:3888"; - acl = new ACL[] { new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) }; Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/config/logging.properties 2010-11-24 21:40:07 UTC (rev 3984) @@ -38,3 +38,4 @@ #log4j.logger.com.bigdata.executor=DEBUG #log4j.logger.com.bigdata.executor.EmbeddedCallableExecutor=DEBUG +#org.apache.zookeeper=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/jini/quorum/QuorumPeerManager.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -406,6 +406,11 @@ } } + // Other public methods defined by this class, not defined by ZooKeeper + public String getConnectString() { + return connectString; + } + // Private methods private ZooKeeper getClient() throws IOException { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/Constants.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -122,4 +122,13 @@ Boolean.FALSE; // Boolean.parseBoolean // (IBigdataClient.Options.DEFAULT_COLLECT_PLATFORM_STATISTICS); + + // ZooKeeper client session timeout in seconds. Note that for the + // typical tick time of 2 seconds per tick, the session timeout + // should/will be set to a value between 4 and 40 seconds; because + // ZooKeeper requires that the session timeout always fall between + // 2 and 20 ticks. + int LOWER_BOUND_ZK_SESSION_TIMEOUT = 1; + int UPPER_BOUND_ZK_SESSION_TIMEOUT = Integer.MAX_VALUE; + int DEFAULT_UPPER_BOUND_ZK_SESSION_TIMEOUT = 40; } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/ServiceImpl.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -33,6 +33,7 @@ import com.bigdata.btree.filter.IFilterConstructor; import com.bigdata.btree.proc.IIndexProcedure; import com.bigdata.io.SerializerUtil; +import com.bigdata.jini.quorum.QuorumPeerManager; import com.bigdata.jini.start.BigdataZooDefs; import com.bigdata.jini.util.ConfigMath; import com.bigdata.mdi.PartitionLocator; @@ -142,8 +143,6 @@ private EmbeddedShardLocator embeddedShardLocator; -//BTM private Thread waitThread; - /* Constructor used by Service Starter Framework to start this service */ public ServiceImpl(String[] args, LifeCycle lifeCycle) throws Exception { System.out.println("\nZZZZZ SHARD LOCATOR ServiceImpl: constructor"); @@ -440,11 +439,6 @@ serviceId = bootStateUtil.getServiceId(); logger.debug("smsProxyId = null - service generated & persisted " +"(or retreieved) its own proxy id ["+proxyId+"]"); - - setZookeeperConfigInfo(config); - zookeeperAccessor = - new ZooKeeperAccessor - (zookeeperServers, zookeeperSessionTimeout); } else {//ServicesConfiguration pre-generated the proxy id proxyId = smsProxyId; serviceId = com.bigdata.jini.util.JiniUtil.uuid2ServiceID(proxyId); @@ -590,6 +584,13 @@ Boolean.FALSE); this.sdm = new ServiceDiscoveryManager(ldm, null, config); + if (zookeeperAccessor == null) { + setZookeeperConfigInfo(config, this.sdm); + zookeeperAccessor = + new ZooKeeperAccessor + (zookeeperServers, zookeeperSessionTimeout); + } + embeddedShardLocator = new EmbeddedShardLocator (proxyId, hostname, @@ -631,9 +632,6 @@ +", locators=" +Util.writeArrayElementsToString(locatorsToJoin)); -//BTM waitThread = new Util.WaitOnInterruptThread(logger); -//BTM waitThread.start(); - readyState.ready();//ready to accept calls from clients } @@ -693,11 +691,12 @@ futureExporters.removeAll(removeSet); } -//BTM waitThread.interrupt(); -//BTM try { -//BTM waitThread.join(); -//BTM } catch (InterruptedException e) {/*exiting, so swallow*/} - + if (zookeeperAccessor != null) { + try { + zookeeperAccessor.close(); + } catch(InterruptedException e) {//swallow + } + } Util.cleanupOnExit (innerProxy, serverExporter, futureExporters, joinMgr, sdm, ldm); @@ -883,7 +882,22 @@ logger.debug("[main]: smsProxyId="+smsProxyId); } - setZookeeperConfigInfo(smsConfig); + String[] tmpGroups = + (String[])smsConfig.getEntry + ("com.bigdata.service.jini.JiniClient", "groups", + String[].class, DiscoveryGroupManagement.NO_GROUPS); + LookupLocator[] tmpLocs = + (LookupLocator[])smsConfig.getEntry + ("com.bigdata.service.jini.JiniClient", "locators", + LookupLocator[].class, new LookupLocator[]{ }); + DiscoveryManagement tmpLdm = + new LookupDiscoveryManager(tmpGroups, tmpLocs, null); + ServiceDiscoveryManager tmpSdm = + new ServiceDiscoveryManager(tmpLdm, null); + + setZookeeperConfigInfo(smsConfig, tmpSdm); + tmpLdm.terminate(); + tmpSdm.terminate(); zookeeperAccessor = new ZooKeeperAccessor (zookeeperServers, zookeeperSessionTimeout); @@ -906,7 +920,8 @@ ("[main]: logicalServiceZPath="+logicalServiceZPath); if(physicalServiceZPath != null) { byte[] data = SerializerUtil.serialize(smsEntries); - ZooKeeper zookeeperClient = zookeeperAccessor.getZookeeper(); + ZooKeeper zookeeperClient = + zookeeperAccessor.getZookeeper(); logger.debug("[main]: zookeeper client created"); try { zookeeperClient.create @@ -916,7 +931,7 @@ +"[physicalServiceZPath=" +physicalServiceZPath+"]"); } catch(NodeExistsException e) { - zookeeperClient.setData(physicalServiceZPath, data, -1); + zookeeperClient.setData(physicalServiceZPath,data,-1); logger.debug("[main]: zookeeper znode updated " +"[physicalServiceZPath=" +physicalServiceZPath+"]"); @@ -936,8 +951,10 @@ } } - private static void setZookeeperConfigInfo(Configuration zkConfig) - throws ConfigurationException + private static void setZookeeperConfigInfo + (Configuration zkConfig, + ServiceDiscoveryManager srvcDiscMgr) + throws ConfigurationException, IOException { String zkComponent = "org.apache.zookeeper.ZooKeeper"; @@ -949,18 +966,11 @@ } logger.debug("zookeepeRoot="+zookeeperRoot); - zookeeperServers = - (String)zkConfig.getEntry - (zkComponent, "servers", String.class, null); - if(zookeeperServers == null) { - throw new ConfigurationException - ("zookeeper servers not specified"); - } - logger.debug("zookeeperServers="+zookeeperServers); - - zookeeperSessionTimeout = - (Integer)zkConfig.getEntry - (zkComponent, "sessionTimeout", int.class, 300000); + zookeeperSessionTimeout = + Config.getIntEntry(zkConfig, zkComponent, "sessionTimeout", + DEFAULT_UPPER_BOUND_ZK_SESSION_TIMEOUT, + LOWER_BOUND_ZK_SESSION_TIMEOUT, + UPPER_BOUND_ZK_SESSION_TIMEOUT); logger.debug("zookeeperSessionTimeout="+zookeeperSessionTimeout); ACL[] acl = (ACL[])zkConfig.getEntry @@ -970,5 +980,22 @@ } zookeeperAcl = Arrays.asList(acl); logger.debug("zookeeperAcl="+zookeeperAcl); + +//BTM - if config contains "servers" then by-pass dynamic discovery for now + zookeeperServers = + (String)zkConfig.getEntry + (zkComponent, "servers", String.class, null); + if(zookeeperServers == null) { + QuorumPeerManager tmpPeerMgr = + new QuorumPeerManager + (srvcDiscMgr, zookeeperSessionTimeout, logger); + if (tmpPeerMgr == null) { + throw new IOException("zookeeper ensemble unavailable"); + } + ZooKeeper.States zkState = tmpPeerMgr.getState(); + logger.debug("zookeeper state="+zkState); + zookeeperServers = tmpPeerMgr.getConnectString(); + } + logger.debug("zookeeperServers="+zookeeperServers); } } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/logging.properties 2010-11-24 21:40:07 UTC (rev 3984) @@ -38,3 +38,4 @@ #log4j.logger.com.bigdata.metadata=DEBUG #log4j.logger.com.bigdata.metadata.EmbeddedShardLocator=DEBUG +#org.apache.zookeeper=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/metadata/config/shardlocator.config 2010-11-24 21:40:07 UTC (rev 3984) @@ -94,13 +94,10 @@ com.bigdata.metadata.serverILFactory, false, false); } -//NOTE: remove once dynamic discovery of zookeeper is added org.apache.zookeeper.ZooKeeper { zroot = ConfigDeployUtil.getString("federation.name"); - servers = com.bigdata.metadata.serverExporterIpAddr+":2888:3888"; - acl = new ACL[] { new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) }; Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/Constants.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -95,7 +95,7 @@ int UPPER_BOUND_MAX_PARALLEL_TASKS_PER_REQUEST = 1000; int DEFAULT_MAX_PARALLEL_TASKS_PER_REQUEST = Integer.parseInt - (IBigdataClient.Options.DEFAULT_CLIENT_MAX_PARALLEL_TASKS_PER_REQUEST); + (IBigdataClient.Options.DEFAULT_CLIENT_MAX_PARALLEL_TASKS_PER_REQUEST); long LOWER_BOUND_TASK_TIMEOUT = 0; long UPPER_BOUND_TASK_TIMEOUT = Long.MAX_VALUE; @@ -121,4 +121,13 @@ Boolean.FALSE; // Boolean.parseBoolean // (IBigdataClient.Options.DEFAULT_COLLECT_PLATFORM_STATISTICS); + + // ZooKeeper client session timeout in seconds. Note that for the + // typical tick time of 2 seconds per tick, the session timeout + // should/will be set to a value between 4 and 40 seconds; because + // ZooKeeper requires that the session timeout always fall between + // 2 and 20 ticks. + int LOWER_BOUND_ZK_SESSION_TIMEOUT = 1; + int UPPER_BOUND_ZK_SESSION_TIMEOUT = Integer.MAX_VALUE; + int DEFAULT_UPPER_BOUND_ZK_SESSION_TIMEOUT = 40; } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/ServiceImpl.java 2010-11-24 21:40:07 UTC (rev 3984) @@ -32,6 +32,7 @@ import com.bigdata.btree.ResultSet; import com.bigdata.btree.filter.IFilterConstructor; import com.bigdata.btree.proc.IIndexProcedure; +import com.bigdata.jini.quorum.QuorumPeerManager; import com.bigdata.jini.start.BigdataZooDefs; import com.bigdata.io.SerializerUtil; import com.bigdata.jini.util.ConfigMath; @@ -150,8 +151,6 @@ private EmbeddedShardService embeddedShardService; -//BTM private Thread waitThread; - /* Constructor used by Service Starter Framework to start this service */ public ServiceImpl(String[] args, LifeCycle lifeCycle) throws Exception { System.out.println("\nSSSSS SHARD SERVICE ServiceImpl: constructor"); @@ -456,11 +455,6 @@ serviceId = bootStateUtil.getServiceId(); logger.debug("smsProxyId = null - service generated & persisted " +"(or retreieved) its own proxy id ["+proxyId+"]"); - - setZookeeperConfigInfo(config); - zookeeperAccessor = - new ZooKeeperAccessor - (zookeeperServers, zookeeperSessionTimeout); } else {//ServicesConfiguration pre-generated the proxy id proxyId = smsProxyId; serviceId = com.bigdata.jini.util.JiniUtil.uuid2ServiceID(proxyId); @@ -621,6 +615,13 @@ UPPER_BOUND_HTTPD_PORT); this.sdm = new ServiceDiscoveryManager(ldm, null, config); + if (zookeeperAccessor == null) { + setZookeeperConfigInfo(config, this.sdm); + zookeeperAccessor = + new ZooKeeperAccessor + (zookeeperServers, zookeeperSessionTimeout); + } + embeddedShardService = new EmbeddedShardService (proxyId, hostname, @@ -663,9 +664,6 @@ +", locators=" +Util.writeArrayElementsToString(locatorsToJoin)); -//BTM waitThread = new Util.WaitOnInterruptThread(logger); -//BTM waitThread.start(); - readyState.ready();//ready to accept calls from clients } @@ -755,13 +753,14 @@ futureExporters.removeAll(removeSet); } -//BTM waitThread.interrupt(); -//BTM try { -//BTM waitThread.join(); -//BTM } catch (InterruptedException e) {/*exiting, so swallow*/} - + if (zookeeperAccessor != null) { + try { + zookeeperAccessor.close(); + } catch(InterruptedException e) {//swallow + } + } Util.cleanupOnExit - (innerProxy, serverExporter, futureExporters, joinMgr, sdm, ldm); + (innerProxy, serverExporter, futureExporters, joinMgr, sdm, ldm); // Tell the ServiceStarter framework it's ok to release for gc if(lifeCycle != null) { @@ -945,7 +944,22 @@ logger.debug("[main]: smsProxyId="+smsProxyId); } - setZookeeperConfigInfo(smsConfig); + String[] tmpGroups = + (String[])smsConfig.getEntry + ("com.bigdata.service.jini.JiniClient", "groups", + String[].class, DiscoveryGroupManagement.NO_GROUPS); + LookupLocator[] tmpLocs = + (LookupLocator[])smsConfig.getEntry + ("com.bigdata.service.jini.JiniClient", "locators", + LookupLocator[].class, new LookupLocator[]{ }); + DiscoveryManagement tmpLdm = + new LookupDiscoveryManager(tmpGroups, tmpLocs, null); + ServiceDiscoveryManager tmpSdm = + new ServiceDiscoveryManager(tmpLdm, null); + + setZookeeperConfigInfo(smsConfig, tmpSdm); + tmpLdm.terminate(); + tmpSdm.terminate(); zookeeperAccessor = new ZooKeeperAccessor (zookeeperServers, zookeeperSessionTimeout); @@ -968,7 +982,8 @@ ("[main]: logicalServiceZPath="+logicalServiceZPath); if(physicalServiceZPath != null) { byte[] data = SerializerUtil.serialize(smsEntries); - ZooKeeper zookeeperClient = zookeeperAccessor.getZookeeper(); + ZooKeeper zookeeperClient = + zookeeperAccessor.getZookeeper(); logger.debug("[main]: zookeeper client created"); try { zookeeperClient.create @@ -978,7 +993,7 @@ +"[physicalServiceZPath=" +physicalServiceZPath+"]"); } catch(NodeExistsException e) { - zookeeperClient.setData(physicalServiceZPath, data, -1); + zookeeperClient.setData(physicalServiceZPath,data,-1); logger.debug("[main]: zookeeper znode updated " +"[physicalServiceZPath=" +physicalServiceZPath+"]"); @@ -998,8 +1013,10 @@ } } - private static void setZookeeperConfigInfo(Configuration zkConfig) - throws ConfigurationException + private static void setZookeeperConfigInfo + (Configuration zkConfig, + ServiceDiscoveryManager srvcDiscMgr) + throws ConfigurationException, IOException { String zkComponent = "org.apache.zookeeper.ZooKeeper"; @@ -1011,18 +1028,11 @@ } logger.debug("zookeepeRoot="+zookeeperRoot); - zookeeperServers = - (String)zkConfig.getEntry - (zkComponent, "servers", String.class, null); - if(zookeeperServers == null) { - throw new ConfigurationException - ("zookeeper servers not specified"); - } - logger.debug("zookeeperServers="+zookeeperServers); - - zookeeperSessionTimeout = - (Integer)zkConfig.getEntry - (zkComponent, "sessionTimeout", int.class, 300000); + zookeeperSessionTimeout = + Config.getIntEntry(zkConfig, zkComponent, "sessionTimeout", + DEFAULT_UPPER_BOUND_ZK_SESSION_TIMEOUT, + LOWER_BOUND_ZK_SESSION_TIMEOUT, + UPPER_BOUND_ZK_SESSION_TIMEOUT); logger.debug("zookeeperSessionTimeout="+zookeeperSessionTimeout); ACL[] acl = (ACL[])zkConfig.getEntry @@ -1032,5 +1042,22 @@ } zookeeperAcl = Arrays.asList(acl); logger.debug("zookeeperAcl="+zookeeperAcl); + +//BTM - if config contains "servers" then by-pass dynamic discovery for now + zookeeperServers = + (String)zkConfig.getEntry + (zkComponent, "servers", String.class, null); + if(zookeeperServers == null) { + QuorumPeerManager tmpPeerMgr = + new QuorumPeerManager + (srvcDiscMgr, zookeeperSessionTimeout, logger); + if (tmpPeerMgr == null) { + throw new IOException("zookeeper ensemble unavailable"); + } + ZooKeeper.States zkState = tmpPeerMgr.getState(); + logger.debug("zookeeper state="+zkState); + zookeeperServers = tmpPeerMgr.getConnectString(); + } + logger.debug("zookeeperServers="+zookeeperServers); } } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/logging.properties 2010-11-24 21:40:07 UTC (rev 3984) @@ -38,3 +38,4 @@ #log4j.logger.com.bigdata.shard=DEBUG #log4j.logger.com.bigdata.shard.EmbeddedShardService=DEBUG +#org.apache.zookeeper=DEBUG Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config 2010-11-23 21:39:34 UTC (rev 3983) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/shard/config/shard.config 2010-11-24 21:40:07 UTC (rev 3984) @@ -94,13 +94,10 @@ com.bigdata.shard.serverILFactory, false, false); } -//NOTE: remove once dynamic discovery of zookeeper is added org.apache.zookeeper.ZooKeeper { zroot = ConfigDeployUtil.getString("federation.name"); - servers = com.bigdata.shard.serverExporterIpAddr+":2888:3888"; - acl = new ACL[] { new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) }; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |