From: <tho...@us...> - 2010-10-20 18:32:32
|
Revision: 3833 http://bigdata.svn.sourceforge.net/bigdata/?rev=3833&view=rev Author: thompsonbry Date: 2010-10-20 18:32:25 +0000 (Wed, 20 Oct 2010) Log Message: ----------- Modified the QueryEngineFactory to use a singleton pattern and modified the BigdataSail to NOT shutdown the QueryEngine when the BigdataSail is shutdown. This fixes a problem where the NanoSparqlServer was creating one QueryEngine per query. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java 2010-10-20 18:31:10 UTC (rev 3832) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryEngineFactory.java 2010-10-20 18:32:25 UTC (rev 3833) @@ -34,16 +34,18 @@ import java.util.UUID; import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.cache.ConcurrentWeakValueCache; import com.bigdata.journal.BufferMode; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.Journal; +import com.bigdata.service.IBigdataClient; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ManagedResourceService; import com.bigdata.service.ResourceService; import com.bigdata.util.config.NicUtil; /** - * Factory for a query controller. + * Singleton factory for a query controller. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -51,36 +53,86 @@ public class QueryEngineFactory { /** - * New instance for standalone or scale-out. + * Weak value cache to enforce the singleton pattern for standalone + * journals. + */ + private static ConcurrentWeakValueCache<Journal, QueryEngine> standaloneQECache = new ConcurrentWeakValueCache<Journal, QueryEngine>(); + + /** + * Weak value cache to enforce the singleton pattern for + * {@link IBigdataClient}s (the data services are query engine peers rather + * than controllers and handle their own query engine initialization so as + * to expose their resources to other peers). + */ + private static ConcurrentWeakValueCache<IBigdataFederation<?>, FederatedQueryEngine> federationQECache = new ConcurrentWeakValueCache<IBigdataFederation<?>, FederatedQueryEngine>(); + + /** + * Singleton factory for standalone or scale-out. * * @param indexManager * The database. * * @return The query controller. */ - static public QueryEngine newQueryController(final IIndexManager indexManager) { + static public QueryEngine getQueryController(final IIndexManager indexManager) { if (indexManager instanceof IBigdataFederation<?>) { - return newFederatedQueryController((IBigdataFederation<?>) indexManager); + return getFederatedQueryController((IBigdataFederation<?>) indexManager); } - return newStandaloneQueryController((Journal) indexManager); + return getStandaloneQueryController((Journal) indexManager); } /** - * New query controller for standalone. + * Singleton factory for standalone. * * @param indexManager * The journal. * * @return The query controller. */ - static public QueryEngine newStandaloneQueryController( + static public QueryEngine getStandaloneQueryController( final Journal indexManager) { + if (indexManager == null) + throw new IllegalArgumentException(); + + QueryEngine queryEngine = standaloneQECache.get(indexManager); + + if (queryEngine == null) { + + synchronized (standaloneQECache) { + + if ((queryEngine = standaloneQECache.get(indexManager)) == null) { + + queryEngine = newStandaloneQueryEngine(indexManager); + + standaloneQECache.put(indexManager, queryEngine); + + } + + } + + } + + return queryEngine; + + } + + /** + * Initialize a new query engine for the journal. + * + * @param indexManager + * The journal. + * + * @return The new query engine. + */ + private static QueryEngine newStandaloneQueryEngine( + final Journal indexManager) { + final QueryEngine queryEngine = new QueryEngine(indexManager); queryEngine.init(); @@ -88,7 +140,7 @@ return queryEngine; } - + /** * New query controller for scale-out. * @@ -99,16 +151,53 @@ * * @todo parameterize the local resource service and temporary storage. */ - static public FederatedQueryEngine newFederatedQueryController( + static public FederatedQueryEngine getFederatedQueryController( final IBigdataFederation<?> fed) { + if (fed == null) + throw new IllegalArgumentException(); + + FederatedQueryEngine queryEngine = federationQECache.get(fed); + + if (queryEngine == null) { + + synchronized (federationQECache) { + + if ((queryEngine = federationQECache.get(fed)) == null) { + + queryEngine = newFederatedQueryEngine(fed); + + federationQECache.put(fed, queryEngine); + + } + + } + + } + + return queryEngine; + + } + + /** + * Initialize a new query engine for the federation. + * + * @param fed + * The federation. + * + * @return The new query engine. + */ + private static FederatedQueryEngine newFederatedQueryEngine( + final IBigdataFederation<?> fed) { + + final FederatedQueryEngine queryEngine; + // The local resource service for the query controller. ManagedResourceService queryEngineResourceService = null; // The local persistence store for the query controller. Journal queryEngineStore = null; - final FederatedQueryEngine queryEngine; try { // Create index manager for the query controller. @@ -116,10 +205,11 @@ final Properties p = new Properties(); - p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Temporary - .toString()); + p.setProperty(Journal.Options.BUFFER_MODE, + BufferMode.Temporary.toString()); - p.setProperty(Journal.Options.CREATE_TEMP_FILE, "true"); + p.setProperty(Journal.Options.CREATE_TEMP_FILE, + "true"); queryEngineStore = new Journal(p); @@ -129,12 +219,14 @@ { queryEngineResourceService = new ManagedResourceService( new InetSocketAddress(InetAddress - .getByName(NicUtil.getIpAddress("default.nic", - "default", true/* loopbackOk */)), 0/* port */ + .getByName(NicUtil.getIpAddress( + "default.nic", "default", + true/* loopbackOk */)), 0/* port */ ), 0/* requestServicePoolSize */) { @Override - protected File getResource(UUID uuid) throws Exception { + protected File getResource(UUID uuid) + throws Exception { // Will not serve up files. return null; } @@ -142,8 +234,9 @@ } // create the query controller. - queryEngine = new FederatedQueryController(fed.getServiceUUID(), - fed, queryEngineStore, queryEngineResourceService); + queryEngine = new FederatedQueryController(fed + .getServiceUUID(), fed, queryEngineStore, + queryEngineResourceService); } catch (Throwable t) { @@ -160,9 +253,9 @@ queryEngine.init(); return queryEngine; - + } - + /** * Implementation manages its own local storage and resource service. */ Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-20 18:31:10 UTC (rev 3832) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-20 18:32:25 UTC (rev 3833) @@ -923,7 +923,7 @@ namespaces = Collections.synchronizedMap(new LinkedHashMap<String, String>()); - queryEngine = QueryEngineFactory.newQueryController(database + queryEngine = QueryEngineFactory.getQueryController(database .getIndexManager()); } @@ -996,9 +996,14 @@ public void shutDown() throws SailException { assertOpen(); + + /* + * Note: DO NOT shutdown the query engine. It is shared by all + * operations against the same backing Journal or IBigdataFederation + * within this JVM! + */ +// queryEngine.shutdown(); - queryEngine.shutdown(); - super.shutDown(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |