From: <tho...@us...> - 2012-10-12 18:46:56
|
Revision: 6674 http://bigdata.svn.sourceforge.net/bigdata/?rev=6674&view=rev Author: thompsonbry Date: 2012-10-12 18:46:50 +0000 (Fri, 12 Oct 2012) Log Message: ----------- Modified the NSS to start immediately when the HAJournalServer starts. The NSS now submits a task that handles the default KB create. For HA, that task will block until the quorum meets, but the NSS is not blocked and will begin responding immediately. Of course, until the quorum meets, you can not do much but at least the home page is up. You CAN use the status page as well to examine the state of the quorum. The NSS will respond to SPARQL QUERY and UPDATE requests with "Quorum is not met" until the quorum meets, at which point that task will create the default KB. https://sourceforge.net/apps/trac/bigdata/ticket/530 (Journal HA) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/CreateKBTask.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-10-12 14:56:00 UTC (rev 6673) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-10-12 18:46:50 UTC (rev 6674) @@ -4623,67 +4623,6 @@ * <p> * Access to this field is protected by the {@link #_fieldReadWriteLock} but * MUST also be coordinated as described above. - * - * FIXME HA: Many methods need to be modified to (a) await a quorum if there is - * none; and (b) handle a change in the quorum token. When a quorum breaks, - * these operations can block until a new quorum meets. As long as the - * master has not changed, the master's state remains valid and the other - * nodes can be brought into synchronization (they are synchronized if they - * are at the same message count for the write pipeline). If the master - * fails, then the new master will not have the same buffered write set and - * the outstanding operations (both unisolated and read/write tx) must about - * so we can resume from a known good commit point (this is true even if the - * secondaries remain active since (a) we do not have a strong guarantee - * that the new master is at the same message count on the write pipeline as - * the old master; and (b) transaction write sets are buffered within the - * JVM and/or disk on the master and are not available to the secondaries on - * failover. - * - * @see #read(long) - * @see #write(ByteBuffer) - * @see #write(ByteBuffer, long) - * @see #delete(long) - * @see #closeForWrites(long) However, some methods probably should be - * allowed to proceed without a quorum (or a node must be permitted to - * disconnect permanently from a quorum). E.g.: - * - * @see #close() - * @see #destroy() Another interesting question is whether we have to be in - * a quorum to create a new journal. I would say, "yes" for a data - * service. The atomic cutover to a new journal should only be taken by - * a quorum. - * <p> - * Probably you need to be in a quorum to create an HA journal (outside - * of a data service) as well. That would appear to be necessary in - * order to create the same initial root blocks on each node. In that - * case we have a bootstrapping problem for new HA journals. Either - * they must have a quorum before hand and go through a coordinated - * "commit" protocol for the journal create or they should be created - * first, then negotiate the quorum membership and who is the master - * and then resynchronize before the journal comes on line. - * - * @todo maintain readOnly flag based on whether or not the QuorumService - * was the leader the last time the quorum met and whether the token - * is still valid. use a lightweight test for a valid token to avoid - * lock contention. - * - * @todo readers should not block and await a quorum unless we are willing - * to handle the case where this QuorumService is not joined with the - * met quorum either by reading on the quorum or by throwing an - * exception back to the application. - * - * @todo In HA mode, the followers are read-only. However, that is a - * high-level constraint on application writes. For synchronization, - * we need to be able to write on the journal even when it is not - * joined with the quorum. The WriteCacheService will refuse to relay - * writes if the service is not the leader of a met quorum. - * <p> - * For example, it would be appropriate to impose the constraint on - * which nodes can be read or written at a load balancer at the SAIL - * layer. - * - * if (quorum.isHighlyAvailable() && quorum.isQuorumMet() && - * quorum.getClient().isFollower(quorumToken)) { return true; } */ private volatile long quorumToken = Quorum.NO_QUORUM; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-10-12 14:56:00 UTC (rev 6673) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-10-12 18:46:50 UTC (rev 6674) @@ -64,7 +64,6 @@ import com.bigdata.journal.Journal; import com.bigdata.journal.ValidationError; import com.bigdata.journal.WORMStrategy; -import com.bigdata.journal.jini.ha.HAJournalServer.NSSConfigurationOptions; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.zk.ZKQuorumImpl; import com.bigdata.rwstore.RWStore; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-12 14:56:00 UTC (rev 6673) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-12 18:46:50 UTC (rev 6674) @@ -505,6 +505,21 @@ } /* + * The NSS will start on each service in the quorum. However, + * only the leader will create the default KB (if that option is + * configured). + */ + try { + + startNSS(); + + } catch (Exception e1) { + + log.error("Could not start NanoSparqlServer: " + e1, e1); + + } + + /* * Wait until the server is terminated. */ @@ -754,31 +769,6 @@ } - if (isJoinedMember(token) && server.jettyServer == null) { - /* - * The NSS will start on each service in the quorum. However, - * only the leader will create the default KB (if that option is - * configured). - * - * Submit task since we can not do this in the event thread. - */ - journal.getExecutorService().submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - if (server.jettyServer == null) { - try { - server.startNSS(); - } catch (Exception e1) { - log.error("Could not start NanoSparqlServer: " - + e1, e1); - } - } - return null; - } - }); - - } - } @Override @@ -1695,6 +1685,27 @@ } } + +// /** +// * Conditionally create the default KB instance. +// * +// * @see NSSConfigurationOptions +// * +// * @throws Exception +// */ +// private void conditionalCreateDefaultKB() throws Exception { +// +// final String COMPONENT = NSSConfigurationOptions.COMPONENT; +// +// final String namespace = (String) config.getEntry(COMPONENT, +// NSSConfigurationOptions.NAMESPACE, String.class, +// NSSConfigurationOptions.DEFAULT_NAMESPACE); +// +// final boolean create = (Boolean) config.getEntry(COMPONENT, +// NSSConfigurationOptions.CREATE, Boolean.TYPE, +// NSSConfigurationOptions.DEFAULT_CREATE); +// +// } /** * Setup and start the {@link NanoSparqlServer}. @@ -1734,9 +1745,16 @@ initParams.put(ConfigParams.QUERY_THREAD_POOL_SIZE, queryPoolThreadSize.toString()); + // Note: Create will be handled by the QuorumListener (above). initParams.put(ConfigParams.CREATE, Boolean.toString(create)); } + + if (jettyServer != null && jettyServer.isRunning()) { + + throw new RuntimeException("Already running"); + + } // Setup the embedded jetty server for NSS webapp. jettyServer = NanoSparqlServer.newInstance(port, journal, initParams); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java 2012-10-12 14:56:00 UTC (rev 6673) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java 2012-10-12 18:46:50 UTC (rev 6674) @@ -48,24 +48,18 @@ import com.bigdata.counters.CounterSet; import com.bigdata.counters.ICounterSetAccess; import com.bigdata.counters.IProcessCounters; -import com.bigdata.ha.HAGlue; -import com.bigdata.ha.QuorumService; import com.bigdata.io.DirectBufferPool; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITransactionService; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; -import com.bigdata.quorum.AsynchronousQuorumCloseException; -import com.bigdata.quorum.Quorum; import com.bigdata.rdf.ServiceProviderHook; import com.bigdata.rdf.sail.BigdataSail; -import com.bigdata.rdf.store.ScaleOutTripleStore; import com.bigdata.service.AbstractDistributedFederation; import com.bigdata.service.DefaultClientDelegate; import com.bigdata.service.IBigdataClient; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.jini.JiniClient; -import com.bigdata.service.jini.JiniFederation; import com.bigdata.util.httpd.AbstractHTTPD; /** @@ -78,7 +72,7 @@ public class BigdataRDFServletContextListener implements ServletContextListener { - static private final transient Logger log = Logger + private static final transient Logger log = Logger .getLogger(BigdataRDFServletContextListener.class); private Journal jnl = null; @@ -179,93 +173,14 @@ if (create) { - if (indexManager instanceof Journal) { + /* + * Note: Nobody is watching this future. The task will log any + * errors. + */ - /* - * Create a local triple store. - * - * Note: This hands over the logic to some custom code located - * on the BigdataSail. - */ + indexManager.getExecutorService().submit( + new CreateKBTask(indexManager, namespace)); - final Journal jnl = (Journal) indexManager; - - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = jnl - .getQuorum(); - - boolean isSoloOrLeader; - if (quorum == null) { - - isSoloOrLeader = true; - - } else { - - final long token; - try { - log.warn("Awaiting quorum."); - token = quorum.awaitQuorum(); - } catch (AsynchronousQuorumCloseException e1) { - throw new RuntimeException(e1); - } catch (InterruptedException e1) { - throw new RuntimeException(e1); - } - - if (quorum.getMember().isLeader(token)) { - isSoloOrLeader = true; - } else { - isSoloOrLeader = false; - } - } - - if (isSoloOrLeader) { - - // Attempt to resolve the namespace. - if (indexManager.getResourceLocator().locate(namespace, - ITx.UNISOLATED) == null) { - - log.warn("Creating KB instance: namespace=" + namespace); - - final Properties properties = new Properties( - jnl.getProperties()); - - // override the namespace. - properties.setProperty(BigdataSail.Options.NAMESPACE, - namespace); - - // create the appropriate as configured triple/quad - // store. - BigdataSail.createLTS(jnl, properties); - - } // if( tripleStore == null ) - - } - - } else { - - // Attempt to resolve the namespace. - if (indexManager.getResourceLocator().locate(namespace, - ITx.UNISOLATED) == null) { - - /* - * Register triple store for scale-out. - */ - - log.warn("Creating KB instance: namespace=" + namespace); - - final JiniFederation<?> fed = (JiniFederation<?>) indexManager; - - final Properties properties = fed.getClient() - .getProperties(); - - final ScaleOutTripleStore lts = new ScaleOutTripleStore( - indexManager, namespace, ITx.UNISOLATED, properties); - - lts.create(); - - } // if( tripleStore == null ) - - } - } // if( create ) txs = (indexManager instanceof Journal ? ((Journal) indexManager) Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/CreateKBTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/CreateKBTask.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/CreateKBTask.java 2012-10-12 18:46:50 UTC (rev 6674) @@ -0,0 +1,151 @@ +package com.bigdata.rdf.sail.webapp; + +import java.util.Properties; +import java.util.concurrent.Callable; + +import org.apache.log4j.Logger; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; +import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.quorum.AsynchronousQuorumCloseException; +import com.bigdata.quorum.Quorum; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.store.ScaleOutTripleStore; +import com.bigdata.service.jini.JiniFederation; + +/** + * Task creates a KB for the given namespace iff no such KB exists. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class CreateKBTask implements Callable<Void> { + + private static final transient Logger log = Logger + .getLogger(BigdataRDFServletContextListener.class); + + private final IIndexManager indexManager; + private final String namespace; + + public CreateKBTask(final IIndexManager indexManager, + final String namespace) { + + this.indexManager = indexManager; + this.namespace = namespace; + + } + + public Void call() throws Exception { + + try { + + doRun(); + + } catch (Throwable t) { + + log.error(t, t); + + throw new Exception(t); + + } + + return null; + + } + + private void doRun() { + + if (indexManager instanceof AbstractJournal) { + + /* + * Create a local triple store. + * + * Note: This hands over the logic to some custom code located + * on the BigdataSail. + */ + + final Journal jnl = (Journal) indexManager; + + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = jnl + .getQuorum(); + + boolean isSoloOrLeader; + + if (quorum == null) { + + isSoloOrLeader = true; + + } else { + + final long token; + try { + log.warn("Awaiting quorum."); + token = quorum.awaitQuorum(); + } catch (AsynchronousQuorumCloseException e1) { + throw new RuntimeException(e1); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + + if (quorum.getMember().isLeader(token)) { + isSoloOrLeader = true; + } else { + isSoloOrLeader = false; + } + } + + if (isSoloOrLeader) { + + // Attempt to resolve the namespace. + if (indexManager.getResourceLocator().locate(namespace, + ITx.UNISOLATED) == null) { + + log.warn("Creating KB instance: namespace=" + namespace); + + final Properties properties = new Properties( + jnl.getProperties()); + + // override the namespace. + properties.setProperty(BigdataSail.Options.NAMESPACE, + namespace); + + // create the appropriate as configured triple/quad + // store. + BigdataSail.createLTS(jnl, properties); + + } // if( tripleStore == null ) + + } + + } else { + + // Attempt to resolve the namespace. + if (indexManager.getResourceLocator().locate(namespace, + ITx.UNISOLATED) == null) { + + /* + * Register triple store for scale-out. + */ + + log.warn("Creating KB instance: namespace=" + namespace); + + final JiniFederation<?> fed = (JiniFederation<?>) indexManager; + + final Properties properties = fed.getClient() + .getProperties(); + + final ScaleOutTripleStore lts = new ScaleOutTripleStore( + indexManager, namespace, ITx.UNISOLATED, properties); + + lts.create(); + + } // if( tripleStore == null ) + + } + + } + +} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |