From: <tho...@us...> - 2013-05-13 15:21:28
|
Revision: 7125 http://bigdata.svn.sourceforge.net/bigdata/?rev=7125&view=rev Author: thompsonbry Date: 2013-05-13 15:21:20 +0000 (Mon, 13 May 2013) Log Message: ----------- HAJournal/AbstractJournal: Added validation of the root block and the last live write message. HAJournalServer/Journal/AbstractJournal: Added explicit configuration of the release time consensus protocol timeout. default is currently 10s (same as for the prepare-2phase timeout). Renamed the methods on the IHANotifyReleaseTimeRequest message to indicate that they are "pinned" commit times and commit counters rather than the current commit time and commit counter. This was done to reduce confusion. Modified Paths: -------------- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HANotifyReleaseTimeRequest.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHANotifyReleaseTimeRequest.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java branches/READ_CACHE/bigdata/src/java/com/bigdata/service/AbstractHATransactionService.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HANotifyReleaseTimeRequest.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HANotifyReleaseTimeRequest.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/HANotifyReleaseTimeRequest.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -33,39 +33,40 @@ private static final long serialVersionUID = 1L; private final UUID serviceUUID; - private final long commitTime; - private final long commitCounter; + private final long pinnedCommitTime; + private final long pinnedCommitCounter; private final long timestamp; public HANotifyReleaseTimeRequest(final UUID serviceUUID, - final long commitTime, final long commitCounter, + final long pinnedCommitTime, final long pinnedCommitCounter, final long timestamp) { this.serviceUUID = serviceUUID; - this.commitTime = commitTime; - this.commitCounter = commitCounter; + this.pinnedCommitTime = pinnedCommitTime; + this.pinnedCommitCounter = pinnedCommitCounter; this.timestamp = timestamp; } @Override public String toString() { return super.toString() + "{serviceUUID=" + serviceUUID - + ",commitTime=" + commitTime + ",commitCounter=" - + commitCounter + ",timestamp=" + timestamp + "}"; + + ",pinnedCommitTime=" + pinnedCommitTime + + ",pinnedCommitCounter=" + pinnedCommitCounter + ",timestamp=" + + timestamp + "}"; } - + @Override public UUID getServiceUUID() { return serviceUUID; } @Override - public long getCommitTime() { - return commitTime; + public long getPinnedCommitTime() { + return pinnedCommitTime; } @Override - public long getCommitCounter() { - return commitCounter; + public long getPinnedCommitCounter() { + return pinnedCommitCounter; } @Override Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHANotifyReleaseTimeRequest.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHANotifyReleaseTimeRequest.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/msg/IHANotifyReleaseTimeRequest.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -44,12 +44,12 @@ /** * The earliest pinned commit time on the follower. */ - public long getCommitTime(); + public long getPinnedCommitTime(); /** * The earliest pinned commit counter on the follower. */ - public long getCommitCounter(); + public long getPinnedCommitCounter(); // /** // * The readsOnCommitTime of the earliest active transaction on the follower. Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -139,7 +139,6 @@ import com.bigdata.io.IDataRecordAccess; import com.bigdata.io.SerializerUtil; import com.bigdata.journal.Name2Addr.Entry; -import com.bigdata.journal.jini.ha.HAJournalServer; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.mdi.JournalMetadata; import com.bigdata.quorum.AsynchronousQuorumCloseException; @@ -1731,8 +1730,6 @@ /** * The HA timeout in milliseconds for a 2-phase prepare. * - * @see HAJournalServer.ConfigurationOptions#HA_PREPARE_TIMEOUT - * * @throws UnsupportedOperationException * always. */ @@ -1742,7 +1739,19 @@ } - /** + /** + * The HA timeout in milliseconds for the release time consensus protocol. + * + * @throws UnsupportedOperationException + * always. + */ + public long getHAReleaseTimeConsensusTimeout() { + + throw new UnsupportedOperationException(); + + } + + /** * Core implementation of immediate shutdown handles event reporting. */ protected void _close() { @@ -2923,8 +2932,10 @@ ((AbstractHATransactionService) getLocalTransactionManager() .getTransactionService()) - .updateReleaseTimeConsensus(); - + .updateReleaseTimeConsensus( + getHAReleaseTimeConsensusTimeout(), + TimeUnit.MILLISECONDS); + } catch (Exception ex) { // Wrap and rethrow. @@ -6140,48 +6151,9 @@ if (rootBlock == null) throw new IllegalStateException(); - if (!rootBlock.getUUID().equals( - AbstractJournal.this._rootBlock.getUUID())) { + // Validate the new root block against the current root block. + validateNewRootBlock(isJoined, isLeader, AbstractJournal.this._rootBlock, rootBlock); - /* - * The root block has a different UUID. We can not accept this - * condition. - */ - - throw new IllegalStateException(); - - } - - if (rootBlock.getLastCommitTime() <= AbstractJournal.this._rootBlock - .getLastCommitTime()) { - - /* - * The root block has a commit time that is LTE the most recent - * commit on this Journal. We can not accept this condition. - */ - - throw new IllegalStateException(); - - } - - if (rootBlock.getCommitCounter() <= AbstractJournal.this._rootBlock - .getCommitCounter()) { - - /* - * The root block has a commit counter that is LTE the most - * recent commit counter on this Journal. We can not accept this - * condition. - */ - - throw new IllegalStateException(); - - } - - // the quorum token from the leader is in the root block. - final long prepareToken = rootBlock.getQuorumToken(); - - quorum.assertQuorum(prepareToken); - /* * if(follower) {...} */ @@ -6266,6 +6238,93 @@ } + /** + * Validate the new root block against the current root block. This + * method checks a variety of invariants: + * <ul> + * <li>The UUID of the store must be the same.</li> + * <li>The commitTime must be strictly increasing.</li> + * <li>The commitCounter must increase by ONE (1).</li> + * <li></li> + * </ul> + * + * @param isJoined + * iff this service was joined at the atomic decision point + * in the 2-phase commit protocol. + * @param isLeader + * iff this service is the leader for this commit. + * @param oldRB + * the old (aka current) root block. + * @param newRB + * the new (aka proposed) root block. + */ + protected void validateNewRootBlock(final boolean isJoined, + final boolean isLeader, final IRootBlockView oldRB, + final IRootBlockView newRB) { + + if (oldRB == null) + throw new IllegalStateException(); + + if (newRB == null) + throw new IllegalStateException(); + + // Validate UUID of store is consistent. + if (!newRB.getUUID().equals(oldRB.getUUID())) { + + /* + * The root block has a different UUID. We can not accept this + * condition. + */ + + throw new IllegalStateException("Store UUID: old=" + + oldRB.getUUID() + " != new=" + newRB.getUUID()); + + } + + // Validate commit time is strictly increasing. + if (newRB.getLastCommitTime() <= oldRB.getLastCommitTime()) { + + /* + * The root block has a commit time that is LTE the most recent + * commit on this Journal. We can not accept this condition. + */ + + throw new IllegalStateException("lastCommitTime: old=" + + oldRB.getLastCommitTime() + " > new=" + + newRB.getLastCommitTime()); + + } + + // Validate the new commit counter. + { + + final long newcc = newRB.getCommitCounter(); + + final long oldcc = oldRB.getCommitCounter(); + + if (newcc != (oldcc + 1)) { + + /* + * The new root block MUST have a commit counter that is ONE + * more than the current commit counter on this Journal. We + * can not accept any other value for the commit counter. + */ + + throw new IllegalStateException("commitCounter: ( old=" + + oldcc + " + 1 ) != new=" + newcc); + + } + + // The quorum token from the leader is in the root block. + final long prepareToken = newRB.getQuorumToken(); + + // Verify that the same quorum is still met. + quorum.assertQuorum(prepareToken); + + } + + } + @Override public Future<Void> commit2Phase( final IHA2PhaseCommitMessage commitMessage) { @@ -6361,7 +6420,13 @@ * Only the services that are joined go through the * commit protocol. */ - + if (localService == null) { + /* + * The quorum has been terminated. We can't go + * through the 2-phase commit. + */ + throw new IllegalStateException(); + } AbstractJournal.this.doLocalCommit(localService, rootBlock); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -464,8 +464,8 @@ if (log.isTraceEnabled()) log.trace("follower: " + response); - if (minimumResponse.getCommitCounter() > response - .getCommitCounter()) { + if (minimumResponse.getPinnedCommitCounter() > response + .getPinnedCommitCounter()) { minimumResponse = response; @@ -481,8 +481,8 @@ // Restate the consensus as an appropriate message object. consensus = new HANotifyReleaseTimeResponse( - minimumResponse.getCommitTime(), - minimumResponse.getCommitCounter()); + minimumResponse.getPinnedCommitTime(), + minimumResponse.getPinnedCommitCounter()); if (log.isTraceEnabled()) log.trace("consensus: " + consensus); @@ -510,8 +510,9 @@ * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/673" > * Native thread leak in HAJournalServer process </a> */ - private void messageFollowers(final long token) throws IOException, - InterruptedException, BrokenBarrierException, TimeoutException { + private void messageFollowers(final long token, final long timeout, + final TimeUnit units) throws IOException, InterruptedException, + BrokenBarrierException, TimeoutException { getQuorum().assertLeader(token); @@ -600,8 +601,8 @@ // } // } - try { // FIXME HA TXS : Configuration option for timeout (lift into caller, config @ HAJournal(Server) similar to other timeout. Could be total timeout across 2-phase commit protocol). - barrier.await(20, TimeUnit.SECONDS); + try { + barrier.await(timeout, units); // fall through. } catch (TimeoutException e) { throw e; @@ -808,8 +809,9 @@ */ // Note: Executed on the leader. @Override - public void updateReleaseTimeConsensus() throws IOException, - InterruptedException, TimeoutException, BrokenBarrierException { + public void updateReleaseTimeConsensus(final long timeout, + final TimeUnit units) throws IOException, InterruptedException, + TimeoutException, BrokenBarrierException { final long token = getQuorum().token(); @@ -833,7 +835,7 @@ /* * Message the followers and block until the barrier breaks. */ - barrierState.messageFollowers(token); + barrierState.messageFollowers(token,timeout,units); } finally { Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/service/AbstractHATransactionService.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/service/AbstractHATransactionService.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/service/AbstractHATransactionService.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.Properties; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.bigdata.ha.HATXSGlue; @@ -67,9 +68,15 @@ /** * Coordinate the update of the <i>releaseTime</i> on each service that is * joined with the met quorum. + * + * @param timeout + * The timeout for the release time consensus protocol. + * @param units + * The units for that timeout. */ - abstract public void updateReleaseTimeConsensus() throws IOException, - TimeoutException, InterruptedException, Exception; + abstract public void updateReleaseTimeConsensus(final long timeout, + final TimeUnit units) throws IOException, TimeoutException, + InterruptedException, Exception; /** * Used to make a serviceJoin() MUTEX with the consensus protocol. Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -183,10 +183,15 @@ private final InetSocketAddress writePipelineAddr; /** - * @see Options#HA_PREPARE_TIMEOUT + * @see HAJournalServer.ConfigurationOptions#HA_PREPARE_TIMEOUT */ private final long haPrepareTimeout; + /** + * @see HAJournalServer.ConfigurationOptions#HA_RELEASE_TIME_CONSENSUS_TIMEOUT + */ + private final long haReleaseTimeConsensusTimeout; + // /** // * @see HAJournalServer.ConfigurationOptions#HA_LOG_DIR // */ @@ -335,6 +340,25 @@ } + { + haReleaseTimeConsensusTimeout = (Long) config + .getEntry( + HAJournalServer.ConfigurationOptions.COMPONENT, + HAJournalServer.ConfigurationOptions.HA_RELEASE_TIME_CONSENSUS_TIMEOUT, + Long.TYPE, + HAJournalServer.ConfigurationOptions.DEFAULT_HA_RELEASE_TIME_CONSENSUS_TIMEOUT); + + if (haReleaseTimeConsensusTimeout < HAJournalServer.ConfigurationOptions.MIN_HA_RELEASE_TIME_CONSENSUS_TIMEOUT) { + throw new ConfigurationException( + HAJournalServer.ConfigurationOptions.HA_RELEASE_TIME_CONSENSUS_TIMEOUT + + "=" + + haReleaseTimeConsensusTimeout + + " : must be GTE " + + HAJournalServer.ConfigurationOptions.MIN_HA_RELEASE_TIME_CONSENSUS_TIMEOUT); + } + + } + // HALog manager. haLogNexus = new HALogNexus(server, this, config); @@ -469,6 +493,11 @@ } + /** + * {@inheritDoc} + * + * @see HAJournalServer.ConfigurationOptions#HA_PREPARE_TIMEOUT + */ @Override public final long getHAPrepareTimeout() { @@ -476,6 +505,18 @@ } + /** + * {@inheritDoc} + * + * @see HAJournalServer.ConfigurationOptions#HA_RELEASE_TIME_CONSENSUS_TIMEOUT + */ + @Override + public final long getHAReleaseTimeConsensusTimeout() { + + return haReleaseTimeConsensusTimeout; + + } + // @Override // public final File getHALogDir() { // @@ -615,6 +656,95 @@ } @Override + protected void validateNewRootBlock(final boolean isJoined, + final boolean isLeader, final IRootBlockView oldRB, + final IRootBlockView newRB) { + + super.validateNewRootBlock(isJoined, isLeader, oldRB, newRB); + + if (isJoined && !isLeader) { + + /* + * Verify that the [lastLiveHAWriteMessage] is consisent with + * the proposed new root block. + * + * Note: The [lastLiveHAWriteMessage] is only tracked on the + * followers. Hence we do not use this code path for the leader. + */ + + final IHAWriteMessage msg = getHALogNexus().lastLiveHAWriteMessage; + + if (msg == null) { + + /* + * We should not go through a 2-phase commit without a write + * set. If there is a write set, then the + * lastLiveHAWriteMessage will not be null. + * + * Note: One possible explanation of this exception would be + * a concurrent local abort. That could discard the + * lastLiveHAWriteMessage. + */ + + throw new IllegalStateException("Commit without write set?"); + + } + + if (!msg.getUUID().equals(newRB.getUUID())) { + + /* + * The root block has a different UUID. We can not accept + * this condition. + */ + + throw new IllegalStateException("Store UUID: lastLiveMsg=" + + msg.getUUID() + " != newRB=" + newRB.getUUID()); + + } + + // Validate the new commit counter. + if ((msg.getCommitCounter() + 1) != newRB.getCommitCounter()) { + + /* + * Each message is tagged with the commitCounter for the + * last commit point on the disk. The new root block must + * have a commit counter is that PLUS ONE when compared to + * the last live message. + */ + + throw new IllegalStateException( + "commitCounter: ( lastLiveMsg=" + + msg.getCommitCounter() + + " + 1 ) != newRB=" + + newRB.getCommitCounter()); + + } + + // Validate the write cache block sequence. + if ((msg.getSequence() + 1) != newRB.getBlockSequence()) { + + /* + * This checks two conditions: + * + * 1. The new root block must reflect each live + * HAWriteMessage received. + * + * 2. The service must not PREPARE until all expected + * HAWriteMessages have been received. + */ + + throw new IllegalStateException( + "blockSequence: lastLiveMsg=" + msg.getSequence() + + " + 1 != newRB=" + + newRB.getBlockSequence()); + + } + + } + + } + + @Override public IHALogRootBlocksResponse getHALogRootBlocksForWriteSet( final IHALogRootBlocksRequest msg) throws IOException { @@ -1578,22 +1708,27 @@ final StringBuilder innerRunStateStr = new StringBuilder(); if (innerRunState != null) { innerRunStateStr.append(innerRunState.name()); - switch (innerRunState) { - case Resync: - innerRunStateStr.append(" @ " - + journal.getRootBlockView().getCommitCounter()); - break; - case Operator: { - final String msg = server.getOperatorAlert(); - innerRunStateStr.append("msg=" + msg); - break; - } - default: - break; - } +// switch (innerRunState) { +// case Resync: +// innerRunStateStr.append(" @ " +// + journal.getRootBlockView().getCommitCounter()); +// break; +// case Operator: { +// final String msg = server.getOperatorAlert(); +// innerRunStateStr.append("msg=" + msg); +// break; +// } +// default: +// break; +// } } else { innerRunStateStr.append("N/A"); } + innerRunStateStr.append(" @ " + + journal.getRootBlockView().getCommitCounter()); + final String msg = server.getOperatorAlert(); + if (msg != null) + innerRunStateStr.append(", msg=[" + msg + "]"); return "{server=" + server.getRunState() + ", quorumService=" + innerRunStateStr + "}"; Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-05-13 15:13:37 UTC (rev 7124) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-05-13 15:21:20 UTC (rev 7125) @@ -179,6 +179,24 @@ long MIN_HA_PREPARE_TIMEOUT = 100; // milliseconds. /** + * The timeout in milliseconds that the leader will await the followers + * during the release time consensus protocol. + * <p> + * Note: The timeout must be set with a realistic expectation concerning + * the possibility of garbage collection. A long GC pause could + * otherwise cause the 2-phase commit to fail. With this in mind, a + * reasonable timeout is on the order of 10 seconds. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/623" > + * HA TXS / TXS Bottleneck </a> + */ + String HA_RELEASE_TIME_CONSENSUS_TIMEOUT = "haReleaseTimeConsensusTimeout"; + + long DEFAULT_HA_RELEASE_TIME_CONSENSUS_TIMEOUT = 10000; // milliseconds. + + long MIN_HA_RELEASE_TIME_CONSENSUS_TIMEOUT = 100; // milliseconds. + + /** * The property whose value is the name of the directory in which write * ahead log files will be created to support resynchronization services * trying to join an HA quorum (default {@value #DEFAULT_HA_LOG_DIR}). This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |