|
From: <mar...@us...> - 2014-04-03 09:48:22
|
Revision: 8032
http://sourceforge.net/p/bigdata/code/8032
Author: martyncutcher
Date: 2014-04-03 09:48:19 +0000 (Thu, 03 Apr 2014)
Log Message:
-----------
Commit to allow branch to be added to CI. Note that this includes a delay added to AbstractJournal.gatherPhase() to support HA1 and which must be removed once the cause of its necessity is identified.
Modified Paths:
--------------
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -310,7 +310,7 @@
/**
* When a record is used as a read cache then the readCount is
- * maintained as a metric on its access. �This could be used to
+ * maintained as a metric on its access. ???This could be used to
* determine eviction/compaction.
* <p>
* Note: volatile to guarantee visibility of updates. Might do better
@@ -509,7 +509,8 @@
* @param isHighlyAvailable
* when <code>true</code> the whole record checksum is maintained
* for use when replicating the write cache along the write
- * pipeline.
+ * pipeline. This needs to be <code>true</code> for HA1 as well
+ * since we need to write the HALog.
* @param bufferHasData
* when <code>true</code> the caller asserts that the buffer has
* data (from a replicated write), in which case the position
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -1151,6 +1151,7 @@
done = WriteCache.transferTo(cache/* src */,
curCompactingCache/* dst */, serviceMap, 0/*threshold*/);
if (done) {
+ // Everything was compacted. Send just the address metadata (empty cache block).
sendAddressMetadata(cache);
if (log.isDebugEnabled())
@@ -1164,7 +1165,7 @@
*/
if (flush) {
/*
- * Send out the full cache block.
+ * Send out the full cache block. FIXME Why are we not calling sendAddressMetadata() here?
*/
writeCacheBlock(curCompactingCache);
addClean(curCompactingCache, true/* addFirst */);
@@ -1231,7 +1232,7 @@
* been allocated on the leader in the same order in which the leader
* made those allocations. This information is used to infer the order
* in which the allocators for the different allocation slot sizes are
- * created. This method will synchronous send those address notices and
+ * created. This method will synchronously send those address notices and
* and also makes sure that the followers see the recycled addresses
* records so they can keep both their allocators and the actual
* allocations synchronized with the leader.
@@ -1249,8 +1250,9 @@
throws IllegalStateException, InterruptedException,
ExecutionException, IOException {
- if (quorum == null || !quorum.isHighlyAvailable()
- || !quorum.getClient().isLeader(quorumToken)) {
+// if (quorum == null || !quorum.isHighlyAvailable()
+// || !quorum.getClient().isLeader(quorumToken)) {
+ if (quorum == null) {
return;
}
@@ -1354,7 +1356,7 @@
* unit tests need to be updated to specify [isHighlyAvailable] for
* ALL quorum based test runs.
*/
- final boolean isHA = quorum != null && quorum.isHighlyAvailable();
+ final boolean isHA = quorum != null;
// IFF HA and this is the quorum leader.
final boolean isHALeader = isHA
@@ -1441,10 +1443,12 @@
quorumMember.logWriteCacheBlock(pkg.getMessage(), pkg.getData().duplicate());
// ASYNC MSG RMI + NIO XFER.
- remoteWriteFuture = quorumMember.replicate(null/* req */, pkg.getMessage(),
- pkg.getData().duplicate());
-
- counters.get().nsend++;
+ if (quorum.replicationFactor() > 1) {
+ remoteWriteFuture = quorumMember.replicate(null/* req */, pkg.getMessage(),
+ pkg.getData().duplicate());
+
+ counters.get().nsend++;
+ }
/*
* The quorum leader logs the write cache block here. For the
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -2473,18 +2473,18 @@
}
- /**
- * Return <code>true</code> if the journal is configured for high
- * availability.
- *
- * @see QuorumManager#isHighlyAvailable()
- */
- public boolean isHighlyAvailable() {
+// /**
+// * Return <code>true</code> if the journal is configured for high
+// * availability.
+// *
+// * @see Quorum#isHighlyAvailable()
+// */
+// public boolean isHighlyAvailable() {
+//
+// return quorum == null ? false : quorum.isHighlyAvailable();
+//
+// }
- return quorum == null ? false : quorum.isHighlyAvailable();
-
- }
-
/**
* {@inheritDoc}
* <p>
@@ -3428,8 +3428,16 @@
if (quorum == null)
return;
-// if (!quorum.isHighlyAvailable())
-// return;
+ if (!quorum.isHighlyAvailable()) {
+ // FIXME: Find the reason why this delay is needed and remove it!
+ //
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return;
+ }
/**
* CRITICAL SECTION. We need obtain a distributed consensus for the
@@ -3542,6 +3550,19 @@
// reload the commit record from the new root block.
store._commitRecord = store._getCommitRecord();
+ if (quorum != null) {
+ /*
+ * Write the root block on the HALog file, closing out that
+ * file.
+ */
+ final QuorumService<HAGlue> localService = quorum.getClient();
+ try {
+ localService.logRootBlock(newRootBlock);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
if (txLog.isInfoEnabled())
txLog.info("COMMIT: commitTime=" + commitTime);
@@ -3792,7 +3813,7 @@
if (log.isInfoEnabled())
log.info("commitTime=" + commitTime);
- final CommitState cs = new CommitState(this, commitTime);
+ final CommitState cs = new CommitState(this, commitTime);
/*
* Flush application data, decide whether or not the store is dirty,
@@ -3808,6 +3829,7 @@
}
// Do GATHER (iff HA).
+
cs.gatherPhase();
/*
@@ -3846,12 +3868,12 @@
// Prepare the new root block.
cs.newRootBlock();
- if (quorum == null) {
+ if (quorum == null || quorum.replicationFactor() == 1) {
// Non-HA mode.
cs.commitSimple();
- } else {
+ } else {
// HA mode commit (2-phase commit).
cs.commitHA();
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -146,6 +146,7 @@
}
+ @Override
public ByteBuffer read(final long addr) {
try {
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -272,7 +272,7 @@
* which use this flag to conditionally track the checksum of the entire
* write cache buffer).
*/
- private final boolean isHighlyAvailable;
+ private final boolean isQuorumUsed;
/**
* The {@link UUID} which identifies the journal (this is the same for each
@@ -970,11 +970,11 @@
com.bigdata.journal.Options.HALOG_COMPRESSOR,
com.bigdata.journal.Options.DEFAULT_HALOG_COMPRESSOR);
- isHighlyAvailable = quorum != null && quorum.isHighlyAvailable();
+ isQuorumUsed = quorum != null; // && quorum.isHighlyAvailable();
final boolean useWriteCacheService = fileMetadata.writeCacheEnabled
&& !fileMetadata.readOnly && fileMetadata.closeTime == 0L
- || isHighlyAvailable;
+ || isQuorumUsed;
if (useWriteCacheService) {
/*
@@ -1049,7 +1049,7 @@
final long fileExtent)
throws InterruptedException {
- super(baseOffset, buf, useChecksum, isHighlyAvailable,
+ super(baseOffset, buf, useChecksum, isQuorumUsed,
bufferHasData, opener, fileExtent);
}
@@ -1379,6 +1379,7 @@
* to get the data from another node based on past experience for that
* record.
*/
+ @Override
public ByteBuffer read(final long addr) {
try {
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -688,7 +688,7 @@
throws InterruptedException {
super(buf, useChecksum, m_quorum != null
- && m_quorum.isHighlyAvailable(), bufferHasData, opener,
+ /*&& m_quorum.isHighlyAvailable()*/, bufferHasData, opener,
fileExtent,
m_bufferedWrite);
@@ -1083,7 +1083,7 @@
final boolean highlyAvailable = m_quorum != null
&& m_quorum.isHighlyAvailable();
- final boolean prefixWrites = highlyAvailable;
+ final boolean prefixWrites = m_quorum != null; // highlyAvailable
return new RWWriteCacheService(m_writeCacheBufferCount,
m_minCleanListSize, m_readCacheBufferCount, prefixWrites, m_compactionThreshold, m_hotCacheSize, m_hotCacheThreshold,
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -80,8 +80,9 @@
final long fileExtent)
throws InterruptedException {
- final boolean highlyAvailable = getQuorum() != null
- && getQuorum().isHighlyAvailable();
+// final boolean highlyAvailable = getQuorum() != null
+// && getQuorum().isHighlyAvailable();
+ final boolean highlyAvailable = getQuorum() != null;
return new FileChannelScatteredWriteCache(buf, true/* useChecksum */,
highlyAvailable,
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -184,7 +184,7 @@
// Verify journal can be dumped without error.
dumpJournal(jnl);
-
+
/*
* Now roll that journal forward using the HALog directory.
*/
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -56,7 +56,7 @@
"com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)",
"com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.DefaultSnapshotPolicy("+neverRun+",0)",
// "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()",
- "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true",
+ // "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true",
"com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1",
};
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -36,8 +36,11 @@
import org.openrdf.model.impl.URIImpl;
import org.openrdf.model.vocabulary.RDF;
+import com.bigdata.ha.HAGlue;
+import com.bigdata.ha.QuorumService;
import com.bigdata.journal.AbstractJournal;
import com.bigdata.journal.IIndexManager;
+import com.bigdata.quorum.Quorum;
import com.bigdata.rdf.axioms.Axioms;
import com.bigdata.rdf.axioms.NoAxioms;
import com.bigdata.rdf.axioms.OwlAxioms;
@@ -641,7 +644,10 @@
final AbstractJournal jnl = (AbstractJournal) indexManager;
- if (jnl.isHighlyAvailable()) {
+ final Quorum<HAGlue, QuorumService<HAGlue>> quorum = jnl
+ .getQuorum();
+
+ if (quorum != null && quorum.isHighlyAvailable()) {
g.add(aService, SD.feature, HighlyAvailable);
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-04-03 00:41:27 UTC (rev 8031)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-04-03 09:48:19 UTC (rev 8032)
@@ -55,10 +55,13 @@
import com.bigdata.bop.engine.QueryLog;
import com.bigdata.bop.fed.QueryEngineFactory;
import com.bigdata.counters.CounterSet;
+import com.bigdata.ha.HAGlue;
+import com.bigdata.ha.QuorumService;
import com.bigdata.journal.AbstractJournal;
import com.bigdata.journal.DumpJournal;
import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.Journal;
+import com.bigdata.quorum.Quorum;
import com.bigdata.rdf.sail.sparql.ast.SimpleNode;
import com.bigdata.rdf.sail.webapp.BigdataRDFContext.AbstractQueryTask;
import com.bigdata.rdf.sail.webapp.BigdataRDFContext.RunningQuery;
@@ -497,13 +500,18 @@
// final boolean showQuorum = req.getParameter(SHOW_QUORUM) != null;
- if (getIndexManager() instanceof AbstractJournal
- && ((AbstractJournal) getIndexManager())
- .isHighlyAvailable()) {
+ if (getIndexManager() instanceof AbstractJournal) {
- new HAStatusServletUtil(getIndexManager()).
- doGet(req, resp, current);
+ final Quorum<HAGlue, QuorumService<HAGlue>> quorum = ((AbstractJournal) getIndexManager())
+ .getQuorum();
+ if (quorum != null && quorum.isHighlyAvailable()) {
+
+ new HAStatusServletUtil(getIndexManager()).doGet(req, resp,
+ current);
+
+ }
+
}
current.node("br", "Accepted query count="
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|