From: <tho...@us...> - 2012-09-06 17:36:22
|
Revision: 6542 http://bigdata.svn.sourceforge.net/bigdata/?rev=6542&view=rev Author: thompsonbry Date: 2012-09-06 17:36:15 +0000 (Thu, 06 Sep 2012) Log Message: ----------- Modified the HA code to support the WORM. The common interface is IHABufferStrategy. Code that was specific to the IRWStrategy, RWStrategy, or RWStore has been generalized to IHABufferStrategy. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-09-06 16:19:38 UTC (rev 6541) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-09-06 17:36:15 UTC (rev 6542) @@ -4932,9 +4932,8 @@ // set the new root block. _rootBlock = rootBlock; - if (_bufferStrategy instanceof RWStrategy - && quorum.getMember().isFollower( - rootBlock.getQuorumToken())) { + if (quorum.getMember().isFollower( + rootBlock.getQuorumToken())) { /* * Ensure allocators are synced after commit. This * is only done for the followers. The leader has @@ -4943,11 +4942,11 @@ * updating the allocators. */ if (haLog.isInfoEnabled()) - haLog.error("Reloading allocators: serviceUUID=" + haLog.error("Reset from root block: serviceUUID=" + quorum.getMember().getServiceId()); - ((RWStrategy) _bufferStrategy).getStore() + ((IHABufferStrategy) _bufferStrategy) .resetFromHARootBlock(_rootBlock); - } + } // reload the commit record from the new root block. _commitRecord = _getCommitRecord(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java 2012-09-06 16:19:38 UTC (rev 6541) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java 2012-09-06 17:36:15 UTC (rev 6542) @@ -68,4 +68,10 @@ void setExtentForLocalStore(final long extent) throws IOException, InterruptedException; + /** + * Called from {@link AbstractJournal} commit2Phase to ensure is able to + * read committed data that has been streamed directly to the backing store. + */ + public void resetFromHARootBlock(final IRootBlockView rootBlock); + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2012-09-06 16:19:38 UTC (rev 6541) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2012-09-06 17:36:15 UTC (rev 6542) @@ -774,9 +774,9 @@ return m_store.getOutputStream(context); } -// @Override -// public void resetFromHARootBlock(final IRootBlockView rootBlock) { -// m_store.resetFromHARootBlock(rootBlock); -// } + @Override + public void resetFromHARootBlock(final IRootBlockView rootBlock) { + m_store.resetFromHARootBlock(rootBlock); + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2012-09-06 16:19:38 UTC (rev 6541) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2012-09-06 17:36:15 UTC (rev 6542) @@ -186,15 +186,15 @@ /** * The service responsible for migrating dirty records onto the backing file * and (for HA) onto the other members of the {@link Quorum}. + * <p> + * This MAY be <code>null</code> for a read-only store or if the write cache + * is disabled. It is required for HA. * - * @todo This MAY be <code>null</code> for a read-only store or if the write - * cache is disabled. - * - * @todo Is HA read-only allowed? If so, then since the - * {@link WriteCacheService} handles failover reads it should be - * enabled for HA read-only. + * TODO This should not really be volatile. For HA, we wind up needing to + * set a new value on this field in {@link #abort()}. The field used to + * be final. Perhaps an {@link AtomicReference} would be appropriate now? */ - private final WriteCacheService writeCacheService; + private volatile WriteCacheService writeCacheService; /** * <code>true</code> iff the backing store has record level checksums. @@ -202,6 +202,13 @@ private final boolean useChecksums; /** + * The #of write cache buffers to use. + * + * @see FileMetadata#writeCacheBufferCount + */ + private final int writeCacheBufferCount; + + /** * <code>true</code> if the backing store will be used in an HA * {@link Quorum} (this is passed through to the {@link WriteCache} objects * which use this flag to conditionally track the checksum of the entire @@ -883,6 +890,8 @@ * handles the write pipeline to the downstream quorum members). */ // final Quorum<?,?> quorum = quorumRef.get(); + + this.writeCacheBufferCount = fileMetadata.writeCacheBufferCount; isHighlyAvailable = quorum != null && quorum.isHighlyAvailable(); @@ -894,40 +903,34 @@ /* * WriteCacheService. */ - try { - this.writeCacheService = new WriteCacheService( - fileMetadata.writeCacheBufferCount, useChecksums, - extent, opener, quorum) { - @Override - public WriteCache newWriteCache(final IBufferAccess buf, - final boolean useChecksum, - final boolean bufferHasData, - final IReopenChannel<? extends Channel> opener, - final long fileExtent) - throws InterruptedException { - return new WriteCacheImpl(0/* baseOffset */, buf, - useChecksum, bufferHasData, - (IReopenChannel<FileChannel>) opener, - fileExtent); - } - }; - this._checkbuf = null; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + this.writeCacheService = newWriteCacheService(); + this._checkbuf = null; } else { this.writeCacheService = null; this._checkbuf = useChecksums ? ByteBuffer.allocateDirect(4) : null; } -// System.err.println("WARNING: alpha impl: " -// + this.getClass().getName() -// + (writeCacheService != null ? " : writeCacheBuffers=" -// + fileMetadata.writeCacheBufferCount : " : No cache") -// + ", useChecksums=" + useChecksums); + } + private WriteCacheService newWriteCacheService() { + try { + return new WriteCacheService(writeCacheBufferCount, useChecksums, + extent, opener, quorum) { + @Override + public WriteCache newWriteCache(final IBufferAccess buf, + final boolean useChecksum, final boolean bufferHasData, + final IReopenChannel<? extends Channel> opener, + final long fileExtent) throws InterruptedException { + return new WriteCacheImpl(0/* baseOffset */, buf, + useChecksum, bufferHasData, + (IReopenChannel<FileChannel>) opener, fileExtent); + } + }; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - + /** * Implementation coordinates writes using the read lock of the * {@link DiskOnlyStrategy#extensionLock}. This is necessary in order to @@ -1076,8 +1079,24 @@ if (writeCacheService != null) { try { + if (quorum != null) { + /** + * When the WORMStrategy is part of an HA quorum, we need to + * close out and then reopen the WriteCacheService every + * time the quorum token is changed. For convenience, this + * is handled by extending the semantics of abort() on the + * Journal and reset() on the WORMStrategy. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/530"> + * HA Journal </a> + */ + writeCacheService.close(); + writeCacheService = newWriteCacheService(); + } else { writeCacheService.reset(); writeCacheService.setExtent(extent); + } } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2253,10 +2272,21 @@ public void writeRawBuffer(final HAWriteMessage msg, final IBufferAccess b) throws IOException, InterruptedException { - writeCacheService.newWriteCache(b, useChecksums, - true/* bufferHasData */, opener, msg.getFileExtent()).flush( - false/* force */); - + /* + * Wrap up the data from the message as a WriteCache object. This will + * build up a RecordMap containing the allocations to be made, and + * including a ZERO (0) data length if any offset winds up being deleted + * (released). + */ + final WriteCache writeCache = writeCacheService.newWriteCache(b, + useChecksums, true/* bufferHasData */, opener, + msg.getFileExtent()); + + /* + * Flush the scattered writes in the write cache to the backing + * store. + */ + writeCache.flush(false/* force */); } public void setExtentForLocalStore(final long extent) throws IOException, @@ -2266,4 +2296,10 @@ } + public void resetFromHARootBlock(final IRootBlockView rootBlock) { + + nextOffset.set(rootBlock.getNextOffset()); + + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-09-06 16:19:38 UTC (rev 6541) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-09-06 17:36:15 UTC (rev 6542) @@ -125,11 +125,21 @@ switch (bufferMode) { case DiskRW: break; + case DiskWORM: + break; default: throw new IllegalArgumentException(Options.BUFFER_MODE + "=" + bufferMode + " : does not support HA"); } + final boolean writeCacheEnabled = Boolean.valueOf(properties + .getProperty(Options.WRITE_CACHE_ENABLED, + Options.DEFAULT_WRITE_CACHE_ENABLED)); + + if (!writeCacheEnabled) + throw new IllegalArgumentException(Options.WRITE_CACHE_ENABLED + + " : must be true."); + if (properties.get(Options.WRITE_PIPELINE_ADDR) == null) { throw new RuntimeException(Options.WRITE_PIPELINE_ADDR Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-09-06 16:19:38 UTC (rev 6541) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-09-06 17:36:15 UTC (rev 6542) @@ -37,7 +37,7 @@ import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.jini.util.JiniUtil; import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.RWStrategy; +import com.bigdata.journal.IHABufferStrategy; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumActor; @@ -393,7 +393,7 @@ && quorum.getMember().isLeader( e.token())) { try { - System.err + System.out// TODO LOG @ INFO .println("Starting NSS"); startNSS(); } catch (Exception e1) { @@ -420,7 +420,7 @@ quorum.start(newQuorumService(logicalServiceId, serviceUUID, haGlueService, journal)); - final QuorumActor actor = quorum.getActor(); + final QuorumActor<?,?> actor = quorum.getActor(); actor.memberAdd(); actor.pipelineAdd(); actor.castVote(journal.getLastCommitTime()); @@ -560,8 +560,8 @@ } }; - ((RWStrategy) journal.getBufferStrategy()).writeRawBuffer(msg, - b); + ((IHABufferStrategy) journal.getBufferStrategy()) + .writeRawBuffer(msg, b); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |