|
From: <tho...@us...> - 2013-10-30 19:19:49
|
Revision: 7501
http://bigdata.svn.sourceforge.net/bigdata/?rev=7501&view=rev
Author: thompsonbry
Date: 2013-10-30 19:19:39 +0000 (Wed, 30 Oct 2013)
Log Message:
-----------
I have reviewed and revised the following aspects of the commit and 2-phase commit protocol:
- Journal: cancellation of the Future in the GATHER consensus protocol is no longer logged as an ERROR. This is just a data race.
- HAJournalServer: enterErrorState() is now written to be defensive. Nothing will be thrown out, even if the HAQuorumService is not running (quorum.terminate())).
- HAJournalServer.HAQuorumServer.start()/terminate() now use an AtomicBoolean to guard those methods and to make enterErrorState() a NOP if the HAQuorumService is currently in either of those methods.
- HAJournalServer.logRootBlock() no longer has the isJoinedService boolean. This method is only called in the 2-phase commit logic and the 2-phase commit is only executed for services that were joined with the met quorum as of the atomic decision point in commitNow().
- commitNow() - refactored to use a CommitState object and pushed down methods for the different things that are being done into that object. The error handling for the two phase commit logic was simplified to increase understandability.
- The Prepare2Phase and Commit2Phase tasks were simplified. The core code was pushed down into private inner methods. This makes it easier to analyze the error handling code paths.
- TestDumpJournal: added coverage of some more conditions looking to replicate an error observed on the HA3 cluster. I was not able to replicate the problem. It may have been related to allocator recycling or abnormal failure mode on bigdata17 as per this ticket:
{{{
ERROR: 2885231 2013-10-28 16:05:28,194 qtp230584058-49 com.bigdata.rdf.sail.webapp.StatusServlet.doGet(StatusServlet.java:863): java.lang.RuntimeException: java.lang.ClassCastException: com.bigdata.btree.BTree cannot be cast to com.bigdata.journal.Name2Addr
java.lang.RuntimeException: java.lang.ClassCastException: com.bigdata.btree.BTree cannot be cast to com.bigdata.journal.Name2Addr
at com.bigdata.journal.DumpJournal.dumpNamedIndicesMetadata(DumpJournal.java:726)
at com.bigdata.journal.DumpJournal.dumpJournal(DumpJournal.java:599)
at com.bigdata.rdf.sail.webapp.StatusServlet.doGet(StatusServlet.java:485)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:534)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:475)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:929)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:403)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:864)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117)
at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:47)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:114)
at org.eclipse.jetty.server.Server.handle(Server.java:352)
at org.eclipse.jetty.server.HttpConnection.handleRequest(HttpConnection.java:596)
at org.eclipse.jetty.server.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:1051)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:590)
at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:212)
at org.eclipse.jetty.server.HttpConnection.handle(HttpConnection.java:426)
at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:508)
at org.eclipse.jetty.io.nio.SelectChannelEndPoint.access$000(SelectChannelEndPoint.java:34)
at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:40)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:451)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.ClassCastException: com.bigdata.btree.BTree cannot be cast to com.bigdata.journal.Name2Addr
at com.bigdata.journal.AbstractJournal.getIndexWithCommitRecord(AbstractJournal.java:4751)
at com.bigdata.journal.DumpJournal.dumpNamedIndicesMetadata(DumpJournal.java:706)
... 23 more
}}}
TODO:
- Review abort2Phase(). For some invocation contexts, this should be restricted to services that were joined with the met quorum and that voted YES for the PREPARE message.
- QuorumCommitImpl: I have reviewed methods, including the cancellation of remote futures and the error paths. modified to use the local executor service to submit RMI requests in parallel and simplified the code paths for interrupt and error handling for RMI requests or for failures to locate the proxy for a service. However, the new code causes some problems with the HA CI test suite and is being withheld while I investigate those issues in more depth.
I have run threw the HA CI test suite, RWStore test suite, WORM test suite, and TestBigdataSailWithQuads. All is green.
See #760
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/Journal.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-10-30 19:11:05 UTC (rev 7500)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-10-30 19:19:39 UTC (rev 7501)
@@ -160,19 +160,21 @@
* onto the {@link HALogWriter}.
* <p>
* Note: This method is ONLY invoked as part of the 2-phase commit protocol.
- * Therefore, it ONLY applies to the live HALog file. A service is
+ * Therefore, this method ONLY applies to the live HALog file. A service is
* atomically either joined with the met quorum at a 2-phase commit point or
- * not joined. This information is passed through from the 2-phase prepare
- * in the <i>isJoinedService</i> argument.
+ * not joined. The PREPARE and COMMIT messages are ONLY generated for
+ * services that were joined with the met quorum as of that atomic decision
+ * point in the commit protocol. Therefore, this method is never called for
+ * a service that was not joined as of that atomic decision point.
*
- * @param isJoinedService
- * <code>true</code> iff the service was joined with the met
- * quorum at the atomic decision point in the 2-phase commit
- * protocol.
* @param rootBlock
* The root block for the commit point that was just achieved.
*/
- void logRootBlock(final boolean isJoinedService,
+// * @param isJoinedService
+// * <code>true</code> iff the service was joined with the met
+// * quorum at the atomic decision point in the 2-phase commit
+// * protocol.
+ void logRootBlock(//final boolean isJoinedService,
final IRootBlockView rootBlock) throws IOException;
/**
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-10-30 19:11:05 UTC (rev 7500)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-10-30 19:19:39 UTC (rev 7501)
@@ -145,10 +145,10 @@
}
@Override
- public void logRootBlock(final boolean isJoinedService,
+ public void logRootBlock(//final boolean isJoinedService,
final IRootBlockView rootBlock) throws IOException {
- QuorumServiceBase.this.logRootBlock(isJoinedService, rootBlock);
+ QuorumServiceBase.this.logRootBlock(/*isJoinedService,*/ rootBlock);
}
@@ -294,7 +294,7 @@
* Note: The default implementation is a NOP.
*/
@Override
- public void logRootBlock(final boolean isJoinedService,
+ public void logRootBlock(//final boolean isJoinedService,
final IRootBlockView rootBlock) throws IOException {
// NOP
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-30 19:11:05 UTC (rev 7500)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-30 19:19:39 UTC (rev 7501)
@@ -144,6 +144,7 @@
import com.bigdata.io.IDataRecord;
import com.bigdata.io.IDataRecordAccess;
import com.bigdata.io.SerializerUtil;
+import com.bigdata.io.writecache.WriteCacheService;
import com.bigdata.journal.Name2Addr.Entry;
import com.bigdata.mdi.IResourceMetadata;
import com.bigdata.mdi.JournalMetadata;
@@ -164,6 +165,7 @@
import com.bigdata.rwstore.IAllocationManager;
import com.bigdata.rwstore.IHistoryManager;
import com.bigdata.rwstore.IRWStrategy;
+import com.bigdata.rwstore.RWStore;
import com.bigdata.rwstore.sector.MemStrategy;
import com.bigdata.rwstore.sector.MemoryManager;
import com.bigdata.service.AbstractHATransactionService;
@@ -2765,23 +2767,25 @@
*/
_bufferStrategy.abort();
- /*
- * Discard hard references to any indices. The Name2Addr reference
- * will also be discarded below. This should be sufficient to ensure
- * that any index requested by the methods on the AbstractJournal
- * will be re-read from disk using the commit record which we
- * re-load below. This is necessary in order to discard any
- * checkpoints that may have been written on indices since the last
- * commit.
- *
- * FIXME Verify this is not required. Historical index references
- * should not be discarded on abort as they remain valid. Discarding
- * them admits the possibility of a non-canonicalizing cache for the
- * historical indices since an existing historical index reference
- * will continue to be held but a new copy of the index will be
- * loaded on the next request if we clear the cache here.
- */
+ /*
+ * The Name2Addr reference will be discarded below. This should be
+ * sufficient to ensure that any index requested by the methods on
+ * the AbstractJournal will be re-read from disk using the commit
+ * record which we re-load below. This is necessary in order to
+ * discard any checkpoints that may have been written on indices
+ * since the last commit (dirty indices that have not been
+ * checkpointed to the disk are discarded when we discard
+ * Name2Addr).
+ *
+ * Note: Historical index references should NOT be discarded on
+ * abort as they remain valid. Discarding them admits the
+ * possibility of a non-canonicalizing cache for the historical
+ * indices since an existing historical index reference will
+ * continue to be held but a new copy of the index will be loaded on
+ * the next request if we clear the cache here.
+ */
// historicalIndexCache.clear();
+
// discard the commit record and re-read from the store.
_commitRecord = _getCommitRecord();
@@ -3030,165 +3034,192 @@
}
- /**
- * An atomic commit is performed by directing each registered
- * {@link ICommitter} to flush its state onto the store using
- * {@link ICommitter#handleCommit(long)}. The address returned by that
- * method is the address from which the {@link ICommitter} may be reloaded
- * (and its previous address if its state has not changed). That address is
- * saved in the {@link ICommitRecord} under the index for which that
- * committer was {@link #registerCommitter(int, ICommitter) registered}. We
- * then force the data to stable store, update the root block, and force the
- * root block and the file metadata to stable store.
- * <p>
- * Note: Each invocation of this method MUST use a distinct
- * <i>commitTime</i> and the commitTimes MUST be monotonically increasing.
- * These guarantees support both the database version history mechanisms and
- * the High Availability mechanisms.
- *
- * @param commitTime
- * The commit time either of a transaction or of an unisolated
- * commit. Note that when mixing isolated and unisolated commits
- * you MUST use the same {@link ITimestampService} for both
- * purposes.
- *
- * @return The timestamp assigned to the commit record -or- 0L if there were
- * no data to commit.
- */
- // Note: Overridden by StoreManager (DataService).
- protected long commitNow(final long commitTime) {
-
- final WriteLock lock = _fieldReadWriteLock.writeLock();
+ /**
+ * Class to which we attach all of the little pieces of state during
+ * {@link AbstractJournal#commitNow(long)}.
+ * <p>
+ * The non-final fields in this class are laid directly below the method
+ * which set those fields. The methods in the class are laid out in the
+ * top-to-bottom order in which they are executed by commitNow().
+ */
+ static private class CommitState {
+
+ /**
+ * The timestamp at which the commit began.
+ */
+ private final long beginNanos;
- lock.lock();
+ /**
+ * The backing store.
+ */
+ private final AbstractJournal store;
- try {
-
- assertOpen();
+ /**
+ * The backing {@link IBufferStrategy} for the {@link #store}.
+ */
+ private final IBufferStrategy _bufferStrategy;
+
+ /**
+ * The quorum iff HA and <code>null</code> otherwise.
+ */
+ private final Quorum<HAGlue, QuorumService<HAGlue>> quorum;
- final long beginNanos = System.nanoTime();
+ /**
+ * Local HA service implementation (non-Remote) and <code>null</code> if
+ * not in an HA mode..
+ */
+ private final QuorumService<HAGlue> quorumService;
+
+ /**
+ * The commit time either of a transaction or of an unisolated commit.
+ * Note that when mixing isolated and unisolated commits you MUST use
+ * the same {@link ITimestampService} for both purposes.
+ */
+ private final long commitTime;
+
+ /**
+ * The current root block on the journal as of the start of the commit
+ * protocol.
+ */
+ private final IRootBlockView old;
- // #of bytes on the journal as of the previous commit point.
- final long byteCountBefore = _rootBlock.getNextOffset();
+ /**
+ * The quorum token associated with this commit point.
+ */
+ private final long commitToken;
+
+ /** The #of bytes on the journal as of the previous commit point. */
+ private final long byteCountBefore;
+
+ /**
+ * The commit counter that will be assigned to the new commit point.
+ */
+ private final long newCommitCounter;
+
+ /**
+ *
+ * @param store
+ * The backing store.
+ * @param commitTime
+ * The commit time either of a transaction or of an
+ * unisolated commit. Note that when mixing isolated and
+ * unisolated commits you MUST use the same
+ * {@link ITimestampService} for both purposes.
+ */
+ public CommitState(final AbstractJournal store, final long commitTime) {
- if (log.isInfoEnabled())
- log.info("commitTime=" + commitTime);
+ if (store == null)
+ throw new IllegalArgumentException();
+
+ this.beginNanos = System.nanoTime();
- assertCommitTimeAdvances(commitTime);
+ this.store = store;
- final IRootBlockView old = _rootBlock;
+ this.commitTime = commitTime;
- final long newCommitCounter = old.getCommitCounter() + 1;
+ this._bufferStrategy = store._bufferStrategy;
- /*
- * First, run each of the committers accumulating the updated root
- * addresses in an array. In general, these are btrees and they may
- * have dirty nodes or leaves that needs to be evicted onto the
- * store. The first time through, any newly created btrees will have
- * dirty empty roots (the btree code does not optimize away an empty
- * root at this time). However, subsequent commits without
- * intervening data written on the store should not cause any
- * committers to update their root address.
+ // Note: null if not HA.
+ this.quorum = store.quorum;
+
+ /*
+ * Local HA service implementation (non-Remote).
*
- * Note: This also checkpoints the deferred free block list.
- */
- final long[] rootAddrs = notifyCommitters(commitTime);
+ * Note: getClient() throws IllegalStateException if quorum exists
+ * and is not not running.
+ */
+ this.quorumService = quorum == null ? null : quorum.getClient();
- /*
- * See if anything has been written on the store since the last
- * commit.
- */
- if (!_bufferStrategy.requiresCommit(_rootBlock)) {
+ this.old = store._rootBlock;
- /*
- * No data was written onto the store so the commit can not
- * achieve any useful purpose.
- */
+ // #of bytes on the journal as of the previous commit point.
+ this.byteCountBefore = store._rootBlock.getNextOffset();
- if (log.isInfoEnabled())
- log.info("Nothing to commit");
+ this.newCommitCounter = old.getCommitCounter() + 1;
- return 0L;
- }
-
+ this.commitToken = store.quorumToken;
+
+ store.assertCommitTimeAdvances(commitTime);
+
+ }
+
+ /**
+ * Notify {@link ICommitter}s to flush out application data. This sets
+ * the {@link #rootAddrs} for the {@link ICommitRecord}.
+ *
+ * @return <code>true</code> if the store is dirty and the commit should
+ * proceed and <code>false</code> otherwise.
+ */
+ private boolean notifyCommitters() {
+
/*
- * Explicitly call the RootBlockCommitter
+ * First, run each of the committers accumulating the updated root
+ * addresses in an array. In general, these are btrees and they may
+ * have dirty nodes or leaves that needs to be evicted onto the
+ * store. The first time through, any newly created btrees will have
+ * dirty empty roots (the btree code does not optimize away an empty
+ * root at this time). However, subsequent commits without
+ * intervening data written on the store should not cause any
+ * committers to update their root address.
+ *
+ * Note: This also checkpoints the deferred free block list.
*/
- rootAddrs[PREV_ROOTBLOCK] = this.m_rootBlockCommitter
- .handleCommit(commitTime);
+ rootAddrs = store.notifyCommitters(commitTime);
- // Local HA service implementation (non-Remote).
- final QuorumService<HAGlue> quorumService = quorum == null ? null
- : quorum.getClient();
+ /*
+ * See if anything has been written on the store since the last
+ * commit.
+ */
+ if (!_bufferStrategy.requiresCommit(store._rootBlock)) {
- final IJoinedAndNonJoinedServices gatherJoinedAndNonJoinedServices;
- final IHANotifyReleaseTimeResponse consensusReleaseTime;
- if ((_bufferStrategy instanceof IHABufferStrategy)
- && quorum != null && quorum.isHighlyAvailable()) {
-
- /**
- * CRITICAL SECTION. We need obtain a distributed consensus for
- * the services joined with the met quorum concerning the
- * earliest commit point that is pinned by the combination of
- * the active transactions and the minReleaseAge on the TXS. New
- * transaction starts during this critical section will block
- * (on the leader or the folllower) unless they are guaranteed
- * to be allowable, e.g., based on the current minReleaseAge,
- * the new tx would read from the most recent commit point, the
- * new tx would ready from a commit point that is already pinned
- * by an active transaction on that node, etc.
+ /*
+ * Will not do commit.
*
- * Note: Lock makes this section MUTEX with awaitServiceJoin().
- *
- * @see <a href=
- * "https://docs.google.com/document/d/14FO2yJFv_7uc5N0tvYboU-H6XbLEFpvu-G8RhAzvxrk/edit?pli=1#"
- * > HA TXS Design Document </a>
- *
- * @see <a
- * href="https://sourceforge.net/apps/trac/bigdata/ticket/623"
- * > HA TXS / TXS Bottleneck </a>
+ * Note: No data was written onto the store so the commit can
+ * not achieve any useful purpose.
*/
- _gatherLock.lock();
+ return false;
- try {
-
- // Atomic decision point for GATHER re joined services.
- gatherJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices(
- quorum);
-
- // Run the GATHER protocol.
- consensusReleaseTime = ((AbstractHATransactionService) getLocalTransactionManager()
- .getTransactionService())
- .updateReleaseTimeConsensus(newCommitCounter,
- commitTime,
- gatherJoinedAndNonJoinedServices.getJoinedServiceIds(),
- getHAReleaseTimeConsensusTimeout(),
- TimeUnit.MILLISECONDS);
+ }
+
+ /*
+ * Explicitly call the RootBlockCommitter
+ *
+ * Note: This logs the current root block and set the address of
+ * that root block in the as a root address in the commitRecord.
+ * This is of potential use solely in disaster recovery scenarios
+ * where your root blocks are toast, but good root blocks can be
+ * found elsewhere in the file. Once you find a root block, you can
+ * get the commitRecordIndex and then find earlier root blocks using
+ * that root addr. Or you can just scan the file looking for valid
+ * root blocks and then use the most recent one that you can find.
+ */
+ rootAddrs[PREV_ROOTBLOCK] = store.m_rootBlockCommitter
+ .handleCommit(commitTime);
- } catch (Exception ex) {
+ // Will do commit.
+ return true;
- log.error(ex, ex);
-
- // Wrap and rethrow.
- throw new RuntimeException(ex);
-
- } finally {
+ }
- _gatherLock.unlock();
-
- }
-
- } else {
-
- /*
- * Not HA. Did not do GATHER.
- */
-
- gatherJoinedAndNonJoinedServices = null;
- consensusReleaseTime = null;
-
- } // if (HA) do GATHER
+ /**
+ * The new root addresses for the {@link ICommitRecord}.
+ *
+ * @see #notifyCommitters()
+ */
+ private long[] rootAddrs;
+
+ /**
+ * Write out the {@link ICommitRecord}, noting the
+ * {@link #commitRecordAddr}, add the {@link ICommitRecord} to the
+ * {@link CommitRecordIndex}. Finally, checkpoint the
+ * {@link CommitRecordIndex} setting the {@link #commitRecordIndexAddr}.
+ * <p>
+ * Note: This is also responsible for recycling the deferred frees for
+ * {@link IHistoryManager} backends.
+ */
+ private void writeCommitRecord() {
/*
* Before flushing the commitRecordIndex we need to check for
@@ -3202,23 +3233,15 @@
*/
if (_bufferStrategy instanceof IHistoryManager) {
- ((IHistoryManager) _bufferStrategy).checkDeferredFrees(this);
+ ((IHistoryManager) _bufferStrategy)
+ .checkDeferredFrees(store);
}
- /*
- * Write the commit record onto the store.
- *
- * @todo Modify to log the current root block and set the address of
- * that root block in the commitRecord. This will be of use solely
- * in disaster recovery scenarios where your root blocks are toast,
- * but good root blocks can be found elsewhere in the file.
- */
-
final ICommitRecord commitRecord = new CommitRecord(commitTime,
newCommitCounter, rootAddrs);
- final long commitRecordAddr = write(ByteBuffer
+ this.commitRecordAddr = store.write(ByteBuffer
.wrap(CommitRecordSerializer.INSTANCE
.serialize(commitRecord)));
@@ -3226,8 +3249,8 @@
* Add the commit record to an index so that we can recover
* historical states efficiently.
*/
- _commitRecordIndex.add(commitRecordAddr, commitRecord);
-
+ store._commitRecordIndex.add(commitRecordAddr, commitRecord);
+
/*
* Flush the commit record index to the store and stash the address
* of its metadata record in the root block.
@@ -3239,22 +3262,484 @@
* CommitRecordIndex before we can flush the CommitRecordIndex to
* the store.
*/
- final long commitRecordIndexAddr = _commitRecordIndex
+ commitRecordIndexAddr = store._commitRecordIndex
.writeCheckpoint();
- final long commitToken = quorumToken;
+ }
+
+ /**
+ * The address of the {@link ICommitRecord}.
+ *
+ * @see #writeCommitRecord()
+ */
+ private long commitRecordAddr;
+
+ /**
+ * The address of the {@link CommitRecordIndex} once it has been
+ * checkpointed against the backing store.
+ * <p>
+ * Note: The address of the root of the {@link CommitRecordIndex} needs
+ * to go right into the {@link IRootBlockView}. We are unable to place
+ * it into the {@link ICommitRecord} since we need to serialize the
+ * {@link ICommitRecord}, get its address, and add the entry to the
+ * {@link CommitRecordIndex} before we can flush the
+ * {@link CommitRecordIndex} to the store.
+ *
+ * @see #writeCommitRecord()
+ */
+ private long commitRecordIndexAddr;
+
+ /**
+ * Call commit on {@link IBufferStrategy} prior to creating the new
+ * {@link IRootBlockView}. This will flush the {@link WriteCacheService}
+ * . For HA, that ensures that the write set has been replicated to the
+ * followers.
+ * <p>
+ * Note: required for {@link RWStore} since the metaBits allocations are
+ * not made until commit, leading to invalid addresses for recent store
+ * allocations.
+ * <p>
+ * Note: After this, we do not write anything on the backing store other
+ * than the root block. The rest of this code is dedicated to creating a
+ * properly formed root block. For a non-HA deployment, we just lay down
+ * the root block. For an HA deployment, we do a 2-phase commit.
+ * <p>
+ * Note: In HA, the followers lay down the replicated writes
+ * synchronously. Thus, they are guaranteed to be on local storage by
+ * the time the leader finishes WriteCacheService.flush(). This does not
+ * create much latency because the WriteCacheService drains the
+ * dirtyList in a seperate thread.
+ */
+ private void flushWriteSet() {
+
+ _bufferStrategy.commit();
+
+ }
+
+ /**
+ * Create the new root block.
+ */
+ private void newRootBlock() {
+
+ /*
+ * The next offset at which user data would be written. Calculated,
+ * after commit!
+ */
+ final long nextOffset = _bufferStrategy.getNextOffset();
+
+ final long blockSequence;
+ if (_bufferStrategy instanceof IHABufferStrategy) {
+
+ // always available for HA.
+ blockSequence = ((IHABufferStrategy) _bufferStrategy)
+ .getBlockSequence();
+
+ } else {
+
+ blockSequence = old.getBlockSequence();
+
+ }
+
+ /*
+ * Update the firstCommitTime the first time a transaction commits
+ * and the lastCommitTime each time a transaction commits (these are
+ * commit timestamps of isolated or unisolated transactions).
+ */
+
+ final long firstCommitTime = (old.getFirstCommitTime() == 0L ? commitTime
+ : old.getFirstCommitTime());
+
+ final long priorCommitTime = old.getLastCommitTime();
+
+ if (priorCommitTime != 0L) {
+
+ /*
+ * This is a local sanity check to make sure that the commit
+ * timestamps are strictly increasing. An error will be reported
+ * if the commit time for the current (un)isolated transaction
+ * is not strictly greater than the last commit time on the
+ * store as read back from the current root block.
+ */
+
+ assertPriorCommitTimeAdvances(commitTime, priorCommitTime);
+
+ }
+
+ final long lastCommitTime = commitTime;
+ final long metaStartAddr = _bufferStrategy.getMetaStartAddr();
+ final long metaBitsAddr = _bufferStrategy.getMetaBitsAddr();
+
+ // Create the new root block.
+ newRootBlock = new RootBlockView(!old.isRootBlock0(),
+ old.getOffsetBits(), nextOffset, firstCommitTime,
+ lastCommitTime, newCommitCounter, commitRecordAddr,
+ commitRecordIndexAddr,
+ old.getUUID(), //
+ blockSequence,
+ commitToken,//
+ metaStartAddr, metaBitsAddr, old.getStoreType(),
+ old.getCreateTime(), old.getCloseTime(), old.getVersion(),
+ store.checker);
+
+ }
+
+ /**
+ * The new {@link IRootBlockView}.
+ *
+ * @see #newRootBlock()
+ */
+ private IRootBlockView newRootBlock;
+
+
+ /**
+ * Run the GATHER consensus protocol (iff HA).
+ */
+ private void gatherPhase() {
+
+ /*
+ * If not HA, do not do GATHER.
+ */
+
+ if (!(_bufferStrategy instanceof IHABufferStrategy))
+ return;
+
+ if (quorum == null)
+ return;
+
+ if (!quorum.isHighlyAvailable())
+ return;
+
+ /**
+ * CRITICAL SECTION. We need obtain a distributed consensus for the
+ * services joined with the met quorum concerning the earliest
+ * commit point that is pinned by the combination of the active
+ * transactions and the minReleaseAge on the TXS. New transaction
+ * starts during this critical section will block (on the leader or
+ * the folllower) unless they are guaranteed to be allowable, e.g.,
+ * based on the current minReleaseAge, the new tx would read from
+ * the most recent commit point, the new tx would ready from a
+ * commit point that is already pinned by an active transaction on
+ * that node, etc.
+ *
+ * Note: Lock makes this section MUTEX with awaitServiceJoin().
+ *
+ * @see <a href=
+ * "https://docs.google.com/document/d/14FO2yJFv_7uc5N0tvYboU-H6XbLEFpvu-G8RhAzvxrk/edit?pli=1#"
+ * > HA TXS Design Document </a>
+ *
+ * @see <a
+ * href="https://sourceforge.net/apps/trac/bigdata/ticket/623"
+ * > HA TXS / TXS Bottleneck </a>
+ */
+
+ store._gatherLock.lock();
+
+ try {
+
+ // Atomic decision point for GATHER re joined services.
+ gatherJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices(
+ quorum);
+
+ // Run the GATHER protocol.
+ consensusReleaseTime = ((AbstractHATransactionService) store
+ .getLocalTransactionManager().getTransactionService())
+ .updateReleaseTimeConsensus(newCommitCounter,
+ commitTime, gatherJoinedAndNonJoinedServices
+ .getJoinedServiceIds(), store
+ .getHAReleaseTimeConsensusTimeout(),
+ TimeUnit.MILLISECONDS);
+
+ } catch (Exception ex) {
+
+ log.error(ex, ex);
+
+ // Wrap and rethrow.
+ throw new RuntimeException(ex);
+
+ } finally {
+
+ store._gatherLock.unlock();
+
+ }
+
+ }
+
+ /**
+ * Set by {@link #gatherPhase()} IFF HA.
+ */
+ private IJoinedAndNonJoinedServices gatherJoinedAndNonJoinedServices = null;
+ /**
+ * Set by {@link #gatherPhase()} IFF HA.
+ */
+ private IHANotifyReleaseTimeResponse consensusReleaseTime = null;
+
+ /**
+ * Simple (non-HA) commit.
+ */
+ private void commitSimple() {
+
+ /*
+ * Force application data to stable storage _before_
+ * we update the root blocks. This option guarantees
+ * that the application data is stable on the disk
+ * before the atomic commit. Some operating systems
+ * and/or file systems may otherwise choose an
+ * ordered write with the consequence that the root
+ * blocks are laid down on the disk before the
+ * application data and a hard failure could result
+ * in the loss of application data addressed by the
+ * new root blocks (data loss on restart).
+ *
+ * Note: We do not force the file metadata to disk.
+ * If that is done, it will be done by a force()
+ * after we write the root block on the disk.
+ */
+ if (store.doubleSync) {
+
+ _bufferStrategy.force(false/* metadata */);
+
+ }
+
+ // write the root block on to the backing store.
+ _bufferStrategy.writeRootBlock(newRootBlock, store.forceOnCommit);
+
+ if (_bufferStrategy instanceof IRWStrategy) {
+
+ /*
+ * Now the root blocks are down we can commit any transient
+ * state.
+ */
+
+ ((IRWStrategy) _bufferStrategy).postCommit();
+
+ }
+
+ // set the new root block.
+ store._rootBlock = newRootBlock;
+
+ // reload the commit record from the new root block.
+ store._commitRecord = store._getCommitRecord();
+
+ if (txLog.isInfoEnabled())
+ txLog.info("COMMIT: commitTime=" + commitTime);
+
+ }
+
+ /**
+ * HA mode commit (2-phase commit).
+ * <p>
+ * Note: We need to make an atomic decision here regarding whether a
+ * service is joined with the met quorum or not. This information will
+ * be propagated through the HA 2-phase prepare message so services will
+ * know how they must intepret the 2-phase prepare(), commit(), and
+ * abort() requests. The atomic decision is necessary in order to
+ * enforce a consistent role on a services that is resynchronizing and
+ * which might vote to join the quorum and enter the quorum
+ * asynchronously with respect to this decision point.
+ *
+ * TODO If necessary, we could also explicitly provide the zk version
+ * metadata for the znode that is the parent of the joined services.
+ * However, we would need an expanded interface to get that metadata
+ * from zookeeper out of the Quorum.
+ */
+ private void commitHA() {
+
+ try {
+
+ if(!prepare2Phase()) {
+
+ // PREPARE rejected.
+ throw new QuorumException("PREPARE rejected: nyes="
+ + prepareResponse.getYesCount() + ", replicationFactor="
+ + prepareResponse.replicationFactor());
+
+ }
+
+ commitRequest = new CommitRequest(prepareRequest,
+ prepareResponse);
+
+ quorumService.commit2Phase(commitRequest);
+
+ } catch (Throwable e) {
+
+ try {
+ /*
+ * Something went wrong. Any services that were in the
+ * pipeline could have a dirty write set. Services that
+ * voted NO will have already discarded their dirty write
+ * set. We issue an abort2Phase() to tell the other services
+ * to discard the dirty write set as well.
+ *
+ * TODO We only need to issue the 2-phase abort against
+ * those services that (a) were joined with the met quorum;
+ * and (b) voted YES in response to the PREPARE message (any
+ * service that voted NO already discarded its dirty write
+ * set).
+ *
+ * TODO The service should only do the 2-phase abort if the
+ * commitToken and commitCounter are valid. If the quorum
+ * breaks, then the services will move into the Error state
+ * and will do a local abort as part of that transition.
+ */
+ quorumService.abort2Phase(commitToken);
+ } catch (Throwable t) {
+ log.warn(t, t);
+ }
+
+ if (commitRequest != null) {
+ /*
+ * The quorum voted to commit, but something went wrong.
+ *
+ * This forces the leader to fail over. The quorum can then
+ * meet up again around a new consensus.
+ *
+ * Note: It is possible that a new consensus can not be
+ * formed. The 2-phase commit protocol does not handle some
+ * cases of compound failure. For example, consider the case
+ * where the HA cluster is running with bare majority of
+ * services. All services that are active vote YES, but one
+ * of those services fails before processing the COMMIT
+ * message. The quorum will not meet again unless a new
+ * consensus can be formed from the services that remain up
+ * and the services that were not running. The services that
+ * were not running will be at an earlier commit point so
+ * they can not form a consensus with the services that
+ * remain up. Further, there can not be a majority among the
+ * services that were not running (except in the edge case
+ * where the failed commit was the first commit since the
+ * last commit on one of the services that was down at the
+ * start of that failed commit). Situations of this type
+ * require operator intervention. E.g., explicitly rollback
+ * the database or copy HALog files from one machine to
+ * another such that it will apply those HALog files on
+ * restart and form a consensus with the other services.
+ */
+ quorumService.enterErrorState();
+ }
+
+ // Re-throw the root cause exception.
+ throw new RuntimeException(e);
+
+ }
+
+ }
+ // Fields set by the method above.
+ private CommitRequest commitRequest;
+
+ /**
+ * Return <code>true</code> iff the 2-phase PREPARE votes to COMMIT.
+ *
+ * @throws IOException
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private boolean prepare2Phase() throws InterruptedException,
+ TimeoutException, IOException {
+
+ // Atomic decision point for joined vs non-joined services.
+ prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices(
+ quorum);
+
+ prepareRequest = new PrepareRequest(//
+ consensusReleaseTime,//
+ gatherJoinedAndNonJoinedServices,//
+ prepareJoinedAndNonJoinedServices,//
+ newRootBlock,//
+ quorumService.getPrepareTimeout(), // timeout
+ TimeUnit.MILLISECONDS//
+ );
+
+ // issue prepare request.
+ prepareResponse = quorumService.prepare2Phase(prepareRequest);
+
+ if (haLog.isInfoEnabled())
+ haLog.info(prepareResponse.toString());
+
+ return prepareResponse.willCommit();
+
+ }
+ // Fields set by the method above.
+ private IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices;
+ private PrepareRequest prepareRequest;
+ private PrepareResponse prepareResponse;
+
+ } // class CommitState.
+
+ /**
+ * An atomic commit is performed by directing each registered
+ * {@link ICommitter} to flush its state onto the store using
+ * {@link ICommitter#handleCommit(long)}. The address returned by that
+ * method is the address from which the {@link ICommitter} may be reloaded
+ * (and its previous address if its state has not changed). That address is
+ * saved in the {@link ICommitRecord} under the index for which that
+ * committer was {@link #registerCommitter(int, ICommitter) registered}. We
+ * then force the data to stable store, update the root block, and force the
+ * root block and the file metadata to stable store.
+ * <p>
+ * Note: Each invocation of this method MUST use a distinct
+ * <i>commitTime</i> and the commitTimes MUST be monotonically increasing.
+ * These guarantees support both the database version history mechanisms and
+ * the High Availability mechanisms.
+ *
+ * @param commitTime
+ * The commit time either of a transaction or of an unisolated
+ * commit. Note that when mixing isolated and unisolated commits
+ * you MUST use the same {@link ITimestampService} for both
+ * purposes.
+ *
+ * @return The timestamp assigned to the commit record -or- 0L if there were
+ * no data to commit.
+ */
+ // Note: Overridden by StoreManager (DataService).
+ protected long commitNow(final long commitTime) {
+
+ final WriteLock lock = _fieldReadWriteLock.writeLock();
+
+ lock.lock();
+
+ try {
+
+ assertOpen();
+
+ if (log.isInfoEnabled())
+ log.info("commitTime=" + commitTime);
+
+ final CommitState cs = new CommitState(this, commitTime);
+
+ /*
+ * Flush application data, decide whether or not the store is dirty,
+ * and return immediately if it is not dirty.
+ */
+ if (!cs.notifyCommitters()) {
+
+ if (log.isInfoEnabled())
+ log.info("Nothing to commit");
+
+ return 0L;
+
+ }
+
+ // Do GATHER (iff HA).
+ cs.gatherPhase();
+
+ /*
+ * Flush deferred frees (iff RWS), write the commit record onto the
+ * store, and write the commit record index onto the store.
+ */
+ cs.writeCommitRecord();
+
if (quorum != null) {
/*
* Verify that the last negotiated quorum is still valid.
*/
- quorum.assertLeader(commitToken);
+ quorum.assertLeader(cs.commitToken);
}
/*
* Conditionally obtain a lock that will protect the
* commit()/postCommit() protocol.
*/
- final long nextOffset;
+// final long nextOffset;
final Lock commitLock;
if (_bufferStrategy instanceof IRWStrategy) {
commitLock = ((IRWStrategy) _bufferStrategy).getCommitLock();
@@ -3266,271 +3751,48 @@
commitLock.lock();
}
try {
- /*
- * Call commit on buffer strategy prior to retrieving root block,
- * required for RWStore since the metaBits allocations are not made
- * until commit, leading to invalid addresses for recent store
- * allocations.
- *
- * Note: This will flush the write cache. For HA, that ensures that
- * the write set has been replicated to the followers.
- *
- * Note: After this, we do not write anything on the backing store
- * other than the root block. The rest of this code is dedicated to
- * creating a properly formed root block. For a non-HA deployment,
- * we just lay down the root block. For an HA deployment, we do a
- * 2-phase commit.
- *
- * Note: In HA, the followers lay down the replicated writes
- * synchronously. Thus, they are guaranteed to be on local storage
- * by the time the leader finishes WriteCacheService.flush(). This
- * does not create much latency because the WriteCacheService drains
- * the dirtyList in a seperate thread.
- */
- _bufferStrategy.commit();
+
+ // Flush writes to the backing store / followers.
+ cs.flushWriteSet();
- /*
- * The next offset at which user data would be written.
- * Calculated, after commit!
- */
- nextOffset = _bufferStrategy.getNextOffset();
-
- final long blockSequence;
-
- if (_bufferStrategy instanceof IHABufferStrategy) {
-
- // always available for HA.
- blockSequence = ((IHABufferStrategy) _bufferStrategy)
- .getBlockSequence();
-
- } else {
-
- blockSequence = old.getBlockSequence();
-
- }
-
- /*
- * Prepare the new root block.
- */
- final IRootBlockView newRootBlock;
- {
-
- /*
- * Update the firstCommitTime the first time a transaction
- * commits and the lastCommitTime each time a transaction
- * commits (these are commit timestamps of isolated or
- * unisolated transactions).
- */
-
- final long firstCommitTime = (old.getFirstCommitTime() == 0L ? commitTime
- : old.getFirstCommitTime());
-
- final long priorCommitTime = old.getLastCommitTime();
-
- if (priorCommitTime != 0L) {
-
- /*
- * This is a local sanity check to make sure that the commit
- * timestamps are strictly increasing. An error will be
- * reported if the commit time for the current (un)isolated
- * transaction is not strictly greater than the last commit
- * time on the store as read back from the current root
- * block.
- */
-
- assertPriorCommitTimeAdvances(commitTime, priorCommitTime);
-
- }
-
- final long lastCommitTime = commitTime;
- final long metaStartAddr = _bufferStrategy.getMetaStartAddr();
- final long metaBitsAddr = _bufferStrategy.getMetaBitsAddr();
-
- // Create the new root block.
- newRootBlock = new RootBlockView(!old.isRootBlock0(), old
- .getOffsetBits(), nextOffset, firstCommitTime,
- lastCommitTime, newCommitCounter, commitRecordAddr,
- commitRecordIndexAddr, old.getUUID(), //
- blockSequence, commitToken,//
- metaStartAddr, metaBitsAddr, old.getStoreType(),
- old.getCreateTime(), old.getCloseTime(),
- old.getVersion(), checker);
-
- }
-
+ // Prepare the new root block.
+ cs.newRootBlock();
+
if (quorum == null) {
- /*
- * Non-HA mode.
- */
+ // Non-HA mode.
+ cs.commitSimple();
- /*
- * Force application data to stable storage _before_
- * we update the root blocks. This option guarantees
- * that the application data is stable on the disk
- * before the atomic commit. Some operating systems
- * and/or file systems may otherwise choose an
- * ordered write with the consequence that the root
- * blocks are laid down on the disk before the
- * application data and a hard failure could result
- * in the loss of application data addressed by the
- * new root blocks (data loss on restart).
- *
- * Note: We do not force the file metadata to disk.
- * If that is done, it will be done by a force()
- * after we write the root block on the disk.
- */
- if (doubleSync) {
-
- _bufferStrategy.force(false/* metadata */);
-
- }
-
- // write the root block on to the backing store.
- _bufferStrategy.writeRootBlock(newRootBlock, forceOnCommit);
-
- if (_bufferStrategy instanceof IRWStrategy) {
-
- /*
- * Now the root blocks are down we can commit any transient
- * state.
- */
-
- ((IRWStrategy) _bufferStrategy).postCommit();
-
- }
-
- // set the new root block.
- _rootBlock = newRootBlock;
-
- // reload the commit record from the new root block.
- _commitRecord = _getCommitRecord();
-
- if (txLog.isInfoEnabled())
- txLog.info("COMMIT: commitTime=" + commitTime);
-
} else {
-
- /*
- * HA mode.
- *
- * Note: We need to make an atomic decision here regarding
- * whether a service is joined with the met quorum or not. This
- * information will be propagated through the HA 2-phase prepare
- * message so services will know how they must intepret the
- * 2-phase prepare(), commit(), and abort() requests. The atomic
- * decision is necessary in order to enforce a consistent role
- * on a services that is resynchronizing and which might vote to
- * join the quorum and enter the quorum asynchronously with
- * respect to this decision point.
- *
- * TODO If necessary, we could also explicitly provide the zk
- * version metadata for the znode that is the parent of the
- * joined services. However, we would need an expanded interface
- * to get that metadata from zookeeper out of the Quorum..
- */
-
- boolean didVoteYes = false;
- try {
-
- // Atomic decision point for joined vs non-joined services.
- final IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices(
- quorum);
-
- final PrepareRequest req = new PrepareRequest(//
- consensusReleaseTime,//
- gatherJoinedAndNonJoinedServices,//
- prepareJoinedAndNonJoinedServices,//
- newRootBlock,//
- quorumService.getPrepareTimeout(), // timeout
- TimeUnit.MILLISECONDS//
- );
-
- // issue prepare request.
- final PrepareResponse resp = quorumService
- .prepare2Phase(req);
-
- if (haLog.isInfoEnabled())
- haLog.info(resp.toString());
-
- if (resp.willCommit()) {
-
- didVoteYes = true;
-
- quorumService
- .commit2Phase(new CommitRequest(req, resp));
-
- } else {
-
- /*
- * TODO We only need to issue the 2-phase abort
- * against those services that (a) were joined with
- * the met quorum; and (b) voted YES in response to
- * the PREPARE message.
- */
- try {
- quorumService.abort2Phase(commitToken);
- } finally {
- throw new RuntimeException(
- "PREPARE rejected: nyes="
- + resp.getYesCount()
- + ", replicationFactor="
- + resp.replicationFactor());
- }
-
- }
-
- } catch (Throwable e) {
- if (didVoteYes) {
- /*
- * The quorum voted to commit, but something went wrong.
- *
- * FIXME RESYNC : At this point the quorum is probably
- * inconsistent in terms of their root blocks. Rather
- * than attempting to send an abort() message to the
- * quorum, we probably should force the leader to yield
- * its role at which point the quorum will attempt to
- * elect a new master and resynchronize.
- */
- if (quorumService != null) {
- try {
- quorumService.abort2Phase(commitToken);
- } catch (Throwable t) {
- log.warn(t, t);
- }
- }
- } else {
- /*
- * This exception was thrown during the abort handling
- * logic. Note that we already attempting an 2-phase
- * abort since the quorum did not vote "yes".
- *
- * TODO We should probably force a quorum break since
- * there is clearly something wrong with the lines of
- * communication among the nodes.
- */
- }
- throw new RuntimeException(e);
- }
-
+
+ // HA mode commit (2-phase commit).
+ cs.commitHA();
+
} // else HA mode
} finally {
- if(commitLock != null) {
+
+ if (commitLock != null) {
/*
* Release the [commitLock] iff one was taken above.
*/
commitLock.unlock();
}
+
}
- final long elapsedNanos = System.nanoTime() - beginNanos;
+ final long elapsedNanos = System.nanoTime() - cs.beginNanos;
if (BigdataStatics.debug || log.isInfoEnabled()) {
- final String msg = "commit: commitTime=" + commitTime + ", latency="
- + TimeUnit.NANOSECONDS.toMillis(elapsedNanos) + ", nextOffset=" + nextOffset + ", byteCount="
- + (nextOffset - byteCountBefore);
- if (BigdataStatics.debug)
+ final String msg = "commit: commitTime="
+ + cs.commitTime
+ + ", latency="
+ + TimeUnit.NANOSECONDS.toMillis(elapsedNanos)
+ + ", nextOffset="
+ + cs.newRootBlock.getNextOffset()
+ + ", byteCo...
[truncated message content] |