From: <tho...@us...> - 2012-10-11 17:55:46
|
Revision: 6671 http://bigdata.svn.sourceforge.net/bigdata/?rev=6671&view=rev Author: thompsonbry Date: 2012-10-11 17:55:39 +0000 (Thu, 11 Oct 2012) Log Message: ----------- Bug fix to AbstractJournal to go through a low level abort for the leader also when a quorum meets (corrects the previous commit). Modified HALogWriter/Reader to support the alternating root block pattern. 100% binary compatible journal and HALog files established through resynchronization protocol. Added the serviceId to the IHALogRequest and now checking it as part of the resynchronization protocol to make sure that we only accept the HALog replay messages that we requested. https://sourceforge.net/apps/trac/bigdata/ticket/530 (Journal HA) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogReader.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogWriter.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/IHALogRequest.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogReader.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogReader.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogReader.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -71,14 +71,8 @@ * closing root block has not been written and the data in the log * is useless). * - * The root block is slot 0 is always the root block for the - * previous commit point. - * - * The root block in slot 1 is either identical to the root block in - * slot zero (in which case the log file is logically empty) or it - * is the root block that closes the write set in the HA Log file - * (and its commit counter must be exactly one more than the commit - * counter for the opening root block). + * We figure out which root block is the opening root block based on + * standard logic. */ /* * Read the MAGIC and VERSION. @@ -107,9 +101,10 @@ true/* validateChecksum */, false/* alternateRootBlock */, false/* ignoreBadRootBlock */); - m_openRootBlock = tmp.rootBlock0; + m_closeRootBlock = tmp.chooseRootBlock(); - m_closeRootBlock = tmp.rootBlock1; + m_openRootBlock = tmp.rootBlock0 == m_closeRootBlock ? tmp.rootBlock1 + : tmp.rootBlock0; final long cc0 = m_openRootBlock.getCommitCounter(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogWriter.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogWriter.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HALogWriter.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -333,8 +333,18 @@ flush(); // current streamed data - // The closing root block is always in slot 1. - writeRootBlock(false/* isRootBlock0 */, rootBlock); + /* + * The closing root block is written into which ever slot corresponds to + * its whether that root block is root block zero. Both root blocks are + * identical up to this point, so we can write the closing root block + * into either slot. HALogReader will use the commit counters to figure + * out which root block is the opening root block and which root block + * is the closing root block. + */ + writeRootBlock(rootBlock.isRootBlock0(), rootBlock); + +// // The closing root block is always in slot 1. +// writeRootBlock(false/* isRootBlock0 */, rootBlock); close(); @@ -375,8 +385,8 @@ * then close the file and return immediately */ if (m_rootBlock.getCommitCounter() != msg.getCommitCounter()) - throw new IllegalStateException("lastCommitTime=" - + m_rootBlock.getLastCommitTime() + ", but msg=" + msg); + throw new IllegalStateException("commitCounter=" + + m_rootBlock.getCommitCounter() + ", but msg=" + msg); if (m_rootBlock.getLastCommitTime() != msg.getLastCommitTime()) throw new IllegalStateException("lastCommitTime=" Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/HALogRequest.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -23,6 +23,8 @@ */ package com.bigdata.ha.msg; +import java.util.UUID; + public class HALogRequest implements IHALogRequest { /** @@ -30,16 +32,19 @@ */ private static final long serialVersionUID = 1L; + private final UUID serviceId; private final long commitCounter; /** - * + * @param serviceId + * The {@link UUID} of the service that made the request. * @param commitCounter * The commit counter used to identify the desired commit point * (the commit counter of the closing root block). */ - public HALogRequest(final long commitCounter) { + public HALogRequest(final UUID serviceId, final long commitCounter) { + this.serviceId = serviceId; this.commitCounter = commitCounter; } @@ -51,10 +56,18 @@ } - public String toString() { + @Override + public UUID getServiceId() { - return getClass() + "{commitCounter=" + getCommitCounter() + "}"; + return serviceId; } + + public String toString() { + + return getClass() + "{serviceId=" + getServiceId() + ", commitCounter=" + + getCommitCounter() + "}"; + + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/IHALogRequest.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/IHALogRequest.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/msg/IHALogRequest.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -23,6 +23,8 @@ */ package com.bigdata.ha.msg; +import java.util.UUID; + /** * Message requesting the root blocks and other metadata for an HA Log file. */ @@ -34,4 +36,8 @@ */ long getCommitCounter(); + /** + * The UUID of the service that issued this request. + */ + UUID getServiceId(); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -4790,8 +4790,9 @@ // This quorum member. final QuorumService<HAGlue> localService = quorum.getClient(); - - if (_rootBlock.getCommitCounter() == 0 + + if (localService.isJoinedMember(quorumToken) + && _rootBlock.getCommitCounter() == 0 && localService.isFollower(quorumToken)) { /* @@ -4810,35 +4811,13 @@ throw new RuntimeException(e); } + // Installs the root blocks and does a local abort. localService.installRootBlocksFromQuorum(tmp); - - } - - if (localService.isJoinedMember(quorumToken)) { - /* - * We need to reset the backing store with the token for the - * new quorum. There should not be any active writers since - * there was no quorum. Thus, this should just cause the - * backing store to become aware of the new quorum and - * enable writes. - * - * Note: This is done using a local abort, not a 2-phase - * abort. Each node in the quorum should handle this locally - * when it sees the quorum meet event. - * - * TODO This assumes that a service that is not joined with - * the quorum will not go through an _abort(). Such a - * service will have to go through the synchronization - * protocol. If the service is in the pipeline when the - * quorum meets, even through it is not joined, and votes - * the same lastCommitTime, then it MIGHT see all necessary - * replicated writes and if it does, then it could - * synchronize immediately. There is basically a data race - * here. - */ + } else { - _abort(); + // The leader also needs to do a local abort. + doLocalAbort(); } @@ -4914,6 +4893,27 @@ log.info("Synchronized root blocks with qourum: rootBlock=" + _rootBlock); + /* + * We need to reset the backing store with the token for the new quorum. + * There should not be any active writers since there was no quorum. + * Thus, this should just cause the backing store to become aware of the + * new quorum and enable writes. + * + * Note: This is done using a local abort, not a 2-phase abort. Each + * node in the quorum should handle this locally when it sees the quorum + * meet event. + * + * TODO This assumes that a service that is not joined with the quorum + * will not go through an _abort(). Such a service will have to go + * through the synchronization protocol. If the service is in the + * pipeline when the quorum meets, even through it is not joined, and + * votes the same lastCommitTime, then it MIGHT see all necessary + * replicated writes and if it does, then it could synchronize + * immediately. There is basically a data race here. + */ + + _abort(); + } /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RootBlockUtility.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -163,6 +163,26 @@ * Note: For historical compatibility, <code>rootBlock1</code> is chosen if * both root blocks have the same {@link IRootBlockView#getCommitCounter()}. * + * @return The chosen root block. + * + * @throws RuntimeException + * if no root block satisfies the criteria. + */ + public IRootBlockView chooseRootBlock() { + + return chooseRootBlock(rootBlock0, rootBlock1, + false/* alternateRootBlock */, false/* ignoreBadRootBlock */); + + } + + /** + * Return the chosen root block. The root block having the greater + * {@link IRootBlockView#getCommitCounter() commit counter} is chosen by + * default. + * <p> + * Note: For historical compatibility, <code>rootBlock1</code> is chosen if + * both root blocks have the same {@link IRootBlockView#getCommitCounter()}. + * * @param rootBlock0 * Root block 0 (may be <code>null</code> if this root block is * bad). Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-11 15:26:05 UTC (rev 6670) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-11 17:55:39 UTC (rev 6671) @@ -916,6 +916,10 @@ * Note: We need to discard any writes that might have been * buffered before we start the resynchronization of the local * store. + * + * TODO This might not be necessary. We do a low-level abort + * when we install the root blocks from the quorum leader before + * we sync the first commit point. */ journal.doLocalAbort(); @@ -1061,7 +1065,7 @@ try { ft = leader.sendHALogForWriteSet(new HALogRequest( - commitCounter)); + server.serviceUUID, commitCounter)); // Wait until all write cache blocks are received. ft.get(); @@ -1113,8 +1117,13 @@ if (resyncFuture != null && !resyncFuture.isDone()) { + /* + * If we are resynchronizing, then pass ALL messages (both + * live and historical) into handleResyncMessage(). + */ + setExtent(msg); - handleResyncMessage(msg, data); + handleResyncMessage(req, msg, data); } else if (commitCounter == msg.getCommitCounter() && isJoinedMember(msg.getQuorumToken())) { @@ -1190,30 +1199,18 @@ * * @throws InterruptedException * @throws IOException - * - * FIXME RESYNC : There is only one {@link HALogWriter}. It - * can only have one file open. We need to explicitly - * coordinate which log file is open when so we never - * attempt to write a cache block on the write log file (or - * one that is out of sequence). [Consider making those - * things errors in the {@link HALogWriter} - it quietly - * ignores this right now.] */ - private void handleResyncMessage(final IHAWriteMessage msg, - final ByteBuffer data) throws IOException, InterruptedException { + private void handleResyncMessage(final IHALogRequest req, + final IHAWriteMessage msg, final ByteBuffer data) + throws IOException, InterruptedException { logLock.lock(); try { /* - * FIXME RESYNC : Review the transition conditions. [msg.seq+q == - * log.nextSeq] implies that we have observed and logged this - * write block already. That is the duplicate write cache block - * that let's us know that we are fully synchronized with the - * quorum. - * - * FIXME RESYNC : Review when (and where) we open and close log files. + * TODO RESYNC : Review when (and where) we open and close log + * files. */ final HALogWriter logWriter = journal.getHALogWriter(); @@ -1221,36 +1218,64 @@ final long journalCommitCounter = journal.getRootBlockView() .getCommitCounter(); - if (msg.getCommitCounter() == journalCommitCounter - && msg.getSequence() + 1 == logWriter.getSequence()) { + if (req == null) { + + /* + * Live message. + */ + if (msg.getCommitCounter() == journalCommitCounter + && msg.getSequence() + 1 == logWriter.getSequence()) { + + /* + * We just received the last resync message that we need + * to join the met quorum. + */ + + resyncTransitionToMetQuorum(msg, data); + + return; + + } else { + + /* + * Drop live messages since we are not caught up. + */ + + if (haLog.isDebugEnabled()) + log.debug("Ignoring write cache block: msg=" + msg); + + return; + + } + + } else { + /* - * We just received the last resync message that we need to - * join the met quorum. + * A historical message (replay of an HALog file). + * + * Note: We will see ALL messages. We can only log the + * message if it is for our commit point. */ - - resyncTransitionToMetQuorum(msg,data); - - return; - - } - /* - * Log the message and write cache block. - */ + if (!server.serviceUUID.equals(req.getServiceId())) { - if (logWriter.getCommitCounter() != msg.getCommitCounter()) { + /* + * Not our request. Drop the message. + */ + + if (haLog.isDebugEnabled()) + log.debug("Ignoring write cache block: msg=" + msg); - if (haLog.isDebugEnabled()) - log.debug("Ignoring write cache block: msg=" + msg); + return; - return; + } + // log and write cache block. + acceptHAWriteMessage(msg, data); + } - // log and write cache block. - acceptHAWriteMessage(msg, data); - } finally { logLock.unlock(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |