From: <tho...@us...> - 2012-10-03 18:01:13
|
Revision: 6649 http://bigdata.svn.sourceforge.net/bigdata/?rev=6649&view=rev Author: thompsonbry Date: 2012-10-03 18:01:03 +0000 (Wed, 03 Oct 2012) Log Message: ----------- Working w/ martyn on the resynchronization protocol. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumReadImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -1,3 +1,26 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ package com.bigdata.ha; import java.io.IOException; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -114,4 +114,17 @@ */ long getLastCommitCounter(); + /** + * Log the {@link HAWriteMessage} and the associated data (if necessary). + * The log file for the current write set will be deleted if the quorum is + * fully met at the next 2-phase commit. + * + * @param msg + * The {@link HAWriteMessage}. + * @param data + * The {@link WriteCache} block. + */ + void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException; + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -1005,6 +1005,10 @@ abstract protected void handleReplicatedWrite(final HAWriteMessage msg, final ByteBuffer data) throws Exception; + @Override + abstract public void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException; + /** * A utility class that bundles together the Internet address and port at which * the downstream service will accept and relay cache blocks for the write Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumReadImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumReadImpl.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumReadImpl.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -1,3 +1,26 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ package com.bigdata.ha; import java.io.IOException; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -28,11 +28,8 @@ package com.bigdata.ha; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.ObjectOutputStream; import java.nio.ByteBuffer; -import java.util.Formatter; import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -41,7 +38,6 @@ import org.apache.log4j.Logger; -import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IResourceManager; import com.bigdata.journal.IRootBlockView; @@ -126,6 +122,14 @@ } + @Override + public void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException { + + QuorumServiceBase.this.logWriteCacheBlock(msg, data); + + } + }); addListener(this.commitImpl = new QuorumCommitImpl<S>(this)); @@ -224,93 +228,14 @@ * Log the {@link HAWriteMessage} and the associated data onto the * appropriate log file. * <p> - * Note: Logging MUST NOT start in the middle of a write set (all log files - * must be complete). The {@link HAWriteMessage#getSequence()} MUST be ZERO - * (0) when we open a log file. - * - * TODO The WORM should not bother to log the write cache block since it can - * be obtained directly from the backing store. Abstract out an object to - * manage the log file for a commit counter and make it smart about the WORM - * versus the RWStore. The same object will need to handle the read back - * from the log file and in the case of the WORM should get the data block - * from the backing file. However, this object is not responsible for reply - * of log files. We'll have to locate a place for that logic next - probably - * in this class (QuorumServiceBase). - * - * FIXME NOTHING IS CALLING THIS CODE YET! Invoke from - * {@link #handleReplicatedWrite(HAWriteMessage, ByteBuffer)} and from - * {@link WriteCacheService}s WriteTask.call() method (on the leader). + * The default implementation is a NOP. */ public void logWriteCacheBlock(final HAWriteMessage msg, final ByteBuffer data) throws IOException { -// final long currentCommitCounter = getLastCommitCounter(); - - getQuorum().assertQuorum(msg.getQuorumToken()); - - if (msg.getSequence() == 0L) { - - if (processLog != null) { - processLog.close(); - processLog = null; - } - - /* - * The commit counter that will be used to identify the file. - * - * Note: We use commitCounter+1 so the file will be labeled by the - * commit point that will be achieved when that log file is applied - * to a journal whose current commit point is [commitCounter]. - */ - final long commitCounter = msg.getCommitCounter() + 1; - - /* - * Format the name of the log file. - * - * Note: The commit counter in the file name should be zero filled - * to 20 digits so we have the files in lexical order in the file - * system (for convenience). - */ - final String logFile; - { - - final StringBuilder sb = new StringBuilder(); - - final Formatter f = new Formatter(sb); - - f.format("%020d.log", commitCounter); - - logFile = sb.toString(); - - } - - // Establish new log file. - processLog = new ObjectOutputStream(new FileOutputStream(new File( - getHALogDir(), logFile))); - - } - - /* - * FIXME We need to track whether or not we began the sequence at ZERO - * (0). If we did, then we can open a log and start writing for the - * current commitCounter. We do need to keep track of the commit counter - * associated with the log file so we can correctly refuse to log blocks - * on the log file that are associated with a different commit counter. - * We also need to manage the abort() and commit() transitions, ensuring - * that we truncate() the log for abort() (assuming it is for the same - * commit counter) and that we append the root block, force() and - * close() the log for commit. - */ + // NOP } - - /** - * Process log to which the receiveService should write the messages to and - * <code>null</code> if we may not write on it. - * - * FIXME We need to clear this any time the quorum breaks. - */ - private ObjectOutputStream processLog = null; /* * QuorumCommit. Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -618,11 +618,12 @@ ); /* - * FIXME The quorum leader must log the write cache - * block. However, it must be logged exactly once - * (if there is a retry, we do not want to re-log - * the block!) + * The quorum leader must log the write cache block. + * + * Note: It will be logged exactly once (retry send + * will not re-log the block). */ + quorumMember.logWriteCacheBlock(msg, b.duplicate()); remoteWriteFuture = quorumMember.replicate(msg, b); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -52,7 +52,7 @@ private long commitCounter; /** The most recent commit time associated with this message */ - private long commitTime; + private long lastCommitTime; /** The write sequence since last commit beginning at zero */ private long sequence; @@ -75,8 +75,8 @@ } /** The commit time associated with this message. */ - public long getCommitTime() { - return commitTime; + public long getLastCommitTime() { + return lastCommitTime; } /** @@ -114,7 +114,7 @@ + "{size=" + getSize() // + ",chksum=" + getChk() // + ",commitCounter=" + commitCounter // - + ",commitTime=" + commitTime // + + ",commitTime=" + lastCommitTime // + ",sequence=" + sequence // + ",storeType=" + getStoreType() // + ",quorumToken=" + getQuorumToken()// @@ -166,7 +166,7 @@ this.commitCounter = commitCounter; - this.commitTime = commitTime; + this.lastCommitTime = commitTime; this.sequence = sequence; @@ -197,7 +197,7 @@ final HAWriteMessage other = (HAWriteMessage) obj; return commitCounter == other.getCommitCounter() - && commitTime == other.getCommitTime() // + && lastCommitTime == other.getLastCommitTime() // && sequence == other.getSequence() && storeType == other.getStoreType() && quorumToken == other.getQuorumToken() @@ -219,7 +219,7 @@ } storeType = StoreTypeEnum.valueOf(in.readByte()); commitCounter = in.readLong(); - commitTime = in.readLong(); + lastCommitTime = in.readLong(); sequence = in.readLong(); quorumToken = in.readLong(); fileExtent = in.readLong(); @@ -231,7 +231,7 @@ out.write(VERSION0); out.writeByte(storeType.getType()); out.writeLong(commitCounter); - out.writeLong(commitTime); + out.writeLong(lastCommitTime); out.writeLong(sequence); out.writeLong(quorumToken); out.writeLong(fileExtent); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -314,7 +314,15 @@ return MyMockQuorumMember.this.getLastCommitCounter(); } - + + @Override + public void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException { + + MyMockQuorumMember.this.logWriteCacheBlock(msg, data); + + } + }); } @@ -387,6 +395,12 @@ private long lastCommitCounter = 0; private long lastCommitTime = 0; + @Override + public void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException { + // NOP. + } + } // MockQuorumMemberImpl /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-03 17:56:13 UTC (rev 6648) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-03 18:01:03 UTC (rev 6649) @@ -1,5 +1,9 @@ package com.bigdata.journal.jini.ha; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URL; @@ -7,6 +11,7 @@ import java.rmi.Remote; import java.rmi.RemoteException; import java.rmi.server.ServerNotActiveException; +import java.util.Formatter; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -31,12 +36,12 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAGlueDelegate; +import com.bigdata.ha.ProcessLogWriter; import com.bigdata.ha.QuorumService; import com.bigdata.ha.QuorumServiceBase; import com.bigdata.io.IBufferAccess; import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.jini.util.JiniUtil; -import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IHABufferStrategy; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; @@ -133,7 +138,7 @@ * Caching discovery client for the {@link HAGlue} services. */ private HAJournalDiscoveryClient discoveryClient; - + /** * The journal. */ @@ -171,6 +176,15 @@ */ private Server jettyServer; + /** + * Caching discovery client for the {@link HAGlue} services. + */ + public HAJournalDiscoveryClient getDiscoveryClient() { + + return discoveryClient; + + } + public HAJournalServer(final String[] args, final LifeCycle lifeCycle) { super(args, lifeCycle); @@ -503,112 +517,189 @@ /** * Factory for the {@link QuorumService} implementation. * + * @param logicalServiceZPath + * @param serviceId * @param remoteServiceImpl * The object that implements the {@link Remote} interfaces * supporting HA operations. + * @param store + * The {@link HAJournal}. */ - private QuorumServiceBase<HAGlue, AbstractJournal> newQuorumService( + private QuorumServiceBase<HAGlue, HAJournal> newQuorumService( final String logicalServiceZPath, final UUID serviceId, final HAGlue remoteServiceImpl, - final AbstractJournal store) { + final HAJournal store) { - return new QuorumServiceBase<HAGlue, AbstractJournal>( - logicalServiceZPath, serviceId, remoteServiceImpl, store) { + return new HAQuorumService<HAGlue, HAJournal>(logicalServiceZPath, + serviceId, remoteServiceImpl, store, this); - @Override - public void start(final Quorum<?,?> quorum) { - - super.start(quorum); + } - // Inform the Journal about the current token (if any). - journal.setQuorumToken(quorum.token()); - - } + /** + * Concrete {@link QuorumServiceBase} implementation for the + * {@link HAJournal}. + * + * @param logicalServiceZPath + * @param serviceId + * @param remoteServiceImpl + * The object that implements the {@link Remote} interfaces + * supporting HA operations. + * @param store + * The {@link HAJournal}. + */ + static private class HAQuorumService<S extends HAGlue, L extends HAJournal> + extends QuorumServiceBase<S, L> { + + private final L journal; + private final HAJournalServer server; + + public HAQuorumService(final String logicalServiceZPath, + final UUID serviceId, final S remoteServiceImpl, final L store, + final HAJournalServer server) { + + super(logicalServiceZPath, serviceId, remoteServiceImpl, store); + + this.journal = store; - @Override - public void quorumBreak() { + this.server = server; - super.quorumBreak(); - - // Inform the Journal that the quorum token is invalid. - journal.setQuorumToken(Quorum.NO_QUORUM); - - } + } - @Override - public void quorumMeet(final long token, final UUID leaderId) { + @Override + public void start(final Quorum<?,?> quorum) { + + super.start(quorum); - super.quorumMeet(token, leaderId); - - // Inform the journal that there is a new quorum token. - journal.setQuorumToken(token); + // Inform the Journal about the current token (if any). + journal.setQuorumToken(quorum.token()); + + } + + @Override + public void quorumBreak() { - } + super.quorumBreak(); - /** - * Resolve an {@link HAGlue} object from its Service UUID. - */ - @Override - public HAGlue getService(final UUID serviceId) { - - final ServiceItem serviceItem = discoveryClient - .getServiceItem(serviceId); + // Inform the Journal that the quorum token is invalid. + journal.setQuorumToken(Quorum.NO_QUORUM); + + } - if (serviceItem == null) { + @Override + public void quorumMeet(final long token, final UUID leaderId) { - // Not found (per the API). - throw new QuorumException("Service not found: uuid=" - + serviceId); + super.quorumMeet(token, leaderId); + + // Inform the journal that there is a new quorum token. + journal.setQuorumToken(token); - } + } + + /** + * Resolve an {@link HAGlue} object from its Service UUID. + */ + @Override + public S getService(final UUID serviceId) { + + final HAJournalDiscoveryClient discoveryClient = server + .getDiscoveryClient(); - return (HAGlue) serviceItem.service; - + final ServiceItem serviceItem = discoveryClient + .getServiceItem(serviceId); + + if (serviceItem == null) { + + // Not found (per the API). + throw new QuorumException("Service not found: uuid=" + + serviceId); + } - @Override - protected void handleReplicatedWrite(final HAWriteMessage msg, - final ByteBuffer data) throws Exception { + return (S) serviceItem.service; + + } - if (haLog.isDebugEnabled()) - haLog.debug("msg=" + msg + ", buf=" + data); + @Override + protected void handleReplicatedWrite(final HAWriteMessage msg, + final ByteBuffer data) throws Exception { - /* - * Note: the ByteBuffer is owned by the HAReceiveService. This - * just wraps up the reference to the ByteBuffer with an - * interface that is also used by the WriteCache to control - * access to ByteBuffers allocated from the DirectBufferPool. - * However, release() is a NOP on this implementation since the - * ByteBuffer is owner by the HAReceiveService. - */ - final IBufferAccess b = new IBufferAccess() { + if (haLog.isDebugEnabled()) + haLog.debug("msg=" + msg + ", buf=" + data); - @Override - public void release(long timeout, TimeUnit unit) - throws InterruptedException { - // NOP - } + /* + * Log the message and write cache block. + */ + logWriteCacheBlock(msg, data); + + /* + * Note: the ByteBuffer is owned by the HAReceiveService. This + * just wraps up the reference to the ByteBuffer with an + * interface that is also used by the WriteCache to control + * access to ByteBuffers allocated from the DirectBufferPool. + * However, release() is a NOP on this implementation since the + * ByteBuffer is owner by the HAReceiveService. + */ + final IBufferAccess b = new IBufferAccess() { - @Override - public void release() throws InterruptedException { - // NOP - } + @Override + public void release(long timeout, TimeUnit unit) + throws InterruptedException { + // NOP + } - @Override - public ByteBuffer buffer() { - return data; - } - }; + @Override + public void release() throws InterruptedException { + // NOP + } - ((IHABufferStrategy) journal.getBufferStrategy()) - .writeRawBuffer(msg, b); + @Override + public ByteBuffer buffer() { + return data; + } + }; - } + ((IHABufferStrategy) journal.getBufferStrategy()) + .writeRawBuffer(msg, b); - }; + } + /** + * {@inheritDoc} + * <p> + * Note: Logging MUST NOT start in the middle of a write set (all log files + * must be complete). The {@link HAWriteMessage#getSequence()} MUST be ZERO + * (0) when we open a log file. + * + * TODO The WORM should not bother to log the write cache block since it can + * be obtained directly from the backing store. Abstract out an object to + * manage the log file for a commit counter and make it smart about the WORM + * versus the RWStore. The same object will need to handle the read back + * from the log file and in the case of the WORM should get the data block + * from the backing file. However, this object is not responsible for reply + * of log files. We'll have to locate a place for that logic next - probably + * in this class (QuorumServiceBase). + * + * FIXME We need to get the root block on the log and integrate it into the + * commit protocol. Everyone in the pipeline needs to get the root block. + * + * FIXME The class that handles the logging must not write a block twice, + * even if it gets transmitted twice. + */ + public void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException { + + } + + /** + * Process log to which the receiveService should write the messages to and + * <code>null</code> if we may not write on it. + * + * FIXME We need to clear this any time the quorum breaks. + */ + ProcessLogWriter haLogWriter = null; + } - + /** * Setup and start the {@link NanoSparqlServer}. * <p> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |