From: <mar...@us...> - 2014-04-03 09:48:22
|
Revision: 8032 http://sourceforge.net/p/bigdata/code/8032 Author: martyncutcher Date: 2014-04-03 09:48:19 +0000 (Thu, 03 Apr 2014) Log Message: ----------- Commit to allow branch to be added to CI. Note that this includes a delay added to AbstractJournal.gatherPhase() to support HA1 and which must be removed once the cause of its necessity is identified. Modified Paths: -------------- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -310,7 +310,7 @@ /** * When a record is used as a read cache then the readCount is - * maintained as a metric on its access. �This could be used to + * maintained as a metric on its access. ???This could be used to * determine eviction/compaction. * <p> * Note: volatile to guarantee visibility of updates. Might do better @@ -509,7 +509,8 @@ * @param isHighlyAvailable * when <code>true</code> the whole record checksum is maintained * for use when replicating the write cache along the write - * pipeline. + * pipeline. This needs to be <code>true</code> for HA1 as well + * since we need to write the HALog. * @param bufferHasData * when <code>true</code> the caller asserts that the buffer has * data (from a replicated write), in which case the position Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -1151,6 +1151,7 @@ done = WriteCache.transferTo(cache/* src */, curCompactingCache/* dst */, serviceMap, 0/*threshold*/); if (done) { + // Everything was compacted. Send just the address metadata (empty cache block). sendAddressMetadata(cache); if (log.isDebugEnabled()) @@ -1164,7 +1165,7 @@ */ if (flush) { /* - * Send out the full cache block. + * Send out the full cache block. FIXME Why are we not calling sendAddressMetadata() here? */ writeCacheBlock(curCompactingCache); addClean(curCompactingCache, true/* addFirst */); @@ -1231,7 +1232,7 @@ * been allocated on the leader in the same order in which the leader * made those allocations. This information is used to infer the order * in which the allocators for the different allocation slot sizes are - * created. This method will synchronous send those address notices and + * created. This method will synchronously send those address notices and * and also makes sure that the followers see the recycled addresses * records so they can keep both their allocators and the actual * allocations synchronized with the leader. @@ -1249,8 +1250,9 @@ throws IllegalStateException, InterruptedException, ExecutionException, IOException { - if (quorum == null || !quorum.isHighlyAvailable() - || !quorum.getClient().isLeader(quorumToken)) { +// if (quorum == null || !quorum.isHighlyAvailable() +// || !quorum.getClient().isLeader(quorumToken)) { + if (quorum == null) { return; } @@ -1354,7 +1356,7 @@ * unit tests need to be updated to specify [isHighlyAvailable] for * ALL quorum based test runs. */ - final boolean isHA = quorum != null && quorum.isHighlyAvailable(); + final boolean isHA = quorum != null; // IFF HA and this is the quorum leader. final boolean isHALeader = isHA @@ -1441,10 +1443,12 @@ quorumMember.logWriteCacheBlock(pkg.getMessage(), pkg.getData().duplicate()); // ASYNC MSG RMI + NIO XFER. - remoteWriteFuture = quorumMember.replicate(null/* req */, pkg.getMessage(), - pkg.getData().duplicate()); - - counters.get().nsend++; + if (quorum.replicationFactor() > 1) { + remoteWriteFuture = quorumMember.replicate(null/* req */, pkg.getMessage(), + pkg.getData().duplicate()); + + counters.get().nsend++; + } /* * The quorum leader logs the write cache block here. For the Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -2473,18 +2473,18 @@ } - /** - * Return <code>true</code> if the journal is configured for high - * availability. - * - * @see QuorumManager#isHighlyAvailable() - */ - public boolean isHighlyAvailable() { +// /** +// * Return <code>true</code> if the journal is configured for high +// * availability. +// * +// * @see Quorum#isHighlyAvailable() +// */ +// public boolean isHighlyAvailable() { +// +// return quorum == null ? false : quorum.isHighlyAvailable(); +// +// } - return quorum == null ? false : quorum.isHighlyAvailable(); - - } - /** * {@inheritDoc} * <p> @@ -3428,8 +3428,16 @@ if (quorum == null) return; -// if (!quorum.isHighlyAvailable()) -// return; + if (!quorum.isHighlyAvailable()) { + // FIXME: Find the reason why this delay is needed and remove it! + // + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return; + } /** * CRITICAL SECTION. We need obtain a distributed consensus for the @@ -3542,6 +3550,19 @@ // reload the commit record from the new root block. store._commitRecord = store._getCommitRecord(); + if (quorum != null) { + /* + * Write the root block on the HALog file, closing out that + * file. + */ + final QuorumService<HAGlue> localService = quorum.getClient(); + try { + localService.logRootBlock(newRootBlock); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (txLog.isInfoEnabled()) txLog.info("COMMIT: commitTime=" + commitTime); @@ -3792,7 +3813,7 @@ if (log.isInfoEnabled()) log.info("commitTime=" + commitTime); - final CommitState cs = new CommitState(this, commitTime); + final CommitState cs = new CommitState(this, commitTime); /* * Flush application data, decide whether or not the store is dirty, @@ -3808,6 +3829,7 @@ } // Do GATHER (iff HA). + cs.gatherPhase(); /* @@ -3846,12 +3868,12 @@ // Prepare the new root block. cs.newRootBlock(); - if (quorum == null) { + if (quorum == null || quorum.replicationFactor() == 1) { // Non-HA mode. cs.commitSimple(); - } else { + } else { // HA mode commit (2-phase commit). cs.commitHA(); Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -146,6 +146,7 @@ } + @Override public ByteBuffer read(final long addr) { try { Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -272,7 +272,7 @@ * which use this flag to conditionally track the checksum of the entire * write cache buffer). */ - private final boolean isHighlyAvailable; + private final boolean isQuorumUsed; /** * The {@link UUID} which identifies the journal (this is the same for each @@ -970,11 +970,11 @@ com.bigdata.journal.Options.HALOG_COMPRESSOR, com.bigdata.journal.Options.DEFAULT_HALOG_COMPRESSOR); - isHighlyAvailable = quorum != null && quorum.isHighlyAvailable(); + isQuorumUsed = quorum != null; // && quorum.isHighlyAvailable(); final boolean useWriteCacheService = fileMetadata.writeCacheEnabled && !fileMetadata.readOnly && fileMetadata.closeTime == 0L - || isHighlyAvailable; + || isQuorumUsed; if (useWriteCacheService) { /* @@ -1049,7 +1049,7 @@ final long fileExtent) throws InterruptedException { - super(baseOffset, buf, useChecksum, isHighlyAvailable, + super(baseOffset, buf, useChecksum, isQuorumUsed, bufferHasData, opener, fileExtent); } @@ -1379,6 +1379,7 @@ * to get the data from another node based on past experience for that * record. */ + @Override public ByteBuffer read(final long addr) { try { Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -688,7 +688,7 @@ throws InterruptedException { super(buf, useChecksum, m_quorum != null - && m_quorum.isHighlyAvailable(), bufferHasData, opener, + /*&& m_quorum.isHighlyAvailable()*/, bufferHasData, opener, fileExtent, m_bufferedWrite); @@ -1083,7 +1083,7 @@ final boolean highlyAvailable = m_quorum != null && m_quorum.isHighlyAvailable(); - final boolean prefixWrites = highlyAvailable; + final boolean prefixWrites = m_quorum != null; // highlyAvailable return new RWWriteCacheService(m_writeCacheBufferCount, m_minCleanListSize, m_readCacheBufferCount, prefixWrites, m_compactionThreshold, m_hotCacheSize, m_hotCacheThreshold, Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -80,8 +80,9 @@ final long fileExtent) throws InterruptedException { - final boolean highlyAvailable = getQuorum() != null - && getQuorum().isHighlyAvailable(); +// final boolean highlyAvailable = getQuorum() != null +// && getQuorum().isHighlyAvailable(); + final boolean highlyAvailable = getQuorum() != null; return new FileChannelScatteredWriteCache(buf, true/* useChecksum */, highlyAvailable, Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -184,7 +184,7 @@ // Verify journal can be dumped without error. dumpJournal(jnl); - + /* * Now roll that journal forward using the HALog directory. */ Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -56,7 +56,7 @@ "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.DefaultSnapshotPolicy("+neverRun+",0)", // "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", - "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + // "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1", }; Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -36,8 +36,11 @@ import org.openrdf.model.impl.URIImpl; import org.openrdf.model.vocabulary.RDF; +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IIndexManager; +import com.bigdata.quorum.Quorum; import com.bigdata.rdf.axioms.Axioms; import com.bigdata.rdf.axioms.NoAxioms; import com.bigdata.rdf.axioms.OwlAxioms; @@ -641,7 +644,10 @@ final AbstractJournal jnl = (AbstractJournal) indexManager; - if (jnl.isHighlyAvailable()) { + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = jnl + .getQuorum(); + + if (quorum != null && quorum.isHighlyAvailable()) { g.add(aService, SD.feature, HighlyAvailable); Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -55,10 +55,13 @@ import com.bigdata.bop.engine.QueryLog; import com.bigdata.bop.fed.QueryEngineFactory; import com.bigdata.counters.CounterSet; +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.DumpJournal; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.Journal; +import com.bigdata.quorum.Quorum; import com.bigdata.rdf.sail.sparql.ast.SimpleNode; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.AbstractQueryTask; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.RunningQuery; @@ -497,13 +500,18 @@ // final boolean showQuorum = req.getParameter(SHOW_QUORUM) != null; - if (getIndexManager() instanceof AbstractJournal - && ((AbstractJournal) getIndexManager()) - .isHighlyAvailable()) { + if (getIndexManager() instanceof AbstractJournal) { - new HAStatusServletUtil(getIndexManager()). - doGet(req, resp, current); + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = ((AbstractJournal) getIndexManager()) + .getQuorum(); + if (quorum != null && quorum.isHighlyAvailable()) { + + new HAStatusServletUtil(getIndexManager()).doGet(req, resp, + current); + + } + } current.node("br", "Accepted query count=" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |