From: <tho...@us...> - 2012-10-03 15:13:55
|
Revision: 6643 http://bigdata.svn.sourceforge.net/bigdata/?rev=6643&view=rev Author: thompsonbry Date: 2012-10-03 15:13:47 +0000 (Wed, 03 Oct 2012) Log Message: ----------- Working through resynchronization protocol with Martyn for the HA Journal. - done. HAWriteMessageBase - added a version code (short). Added equals() and hashCode() methods. Added equals() on HAWriteMessage. Added the commit counter, commit time, and write block sequence fields. - done. HAWriteMessage: add commitCounter, lastCommitTime, writeCacheSequence (starting at zero). this is integrated into the RWStore and WORMStrategy. - done. HAWriteMessage: unit test serialization format. This is integrated into CI. - done. HAJournal : added a required configuration option for the HA_LOG_DIR. This is the directory in which the HA write messages, write cache blocks, and root blocks will be logged. There is one such log file per commit point. The log files are deleted at the commit, unless the quorum is not fully met in which case the log files will be used to resynchronize the other quorum members. - done. QuorumServiceBase : added a logWriteCacheBlock() method. This is where we will log the HAWriteMessage and WriteCache blocks. Nobody is calling this method yet. That is the next step. - done. Modified AbstractJournal to NOT take the internal field lock for getLastCommitTime() and getRootBlockView. This was causing deadlocks based on lock ordering problems. - done. Modified QuorumCommitImpl to use getService() to access the leader rather than getLeader(token). The latter returns the RMI interface. The former is the local object. The 2-phase commit protocol was going through RMI on the leader to reach the leader. @see https://sourceforge.net/apps/trac/bigdata/ticket/530 (Journal HA) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/pipeline/HAWriteMessageBase.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/Journal.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestAll.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.config branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java branches/BIGDATA_RELEASE_1_2_0/src/resources/HAJournal/HAJournal.config Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestHAWriteMessage.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -381,10 +381,12 @@ { /* - * Run the operation on the leader using local method call in the - * caller's thread to avoid deadlock. + * Run the operation on the leader using a local method call + * (non-RMI) in the caller's thread to avoid deadlock. */ - final Future<Void> f = member.getLeader(token).abort2Phase(token); + member.assertLeader(token); + final S leader = member.getService(); + final Future<Void> f = leader.abort2Phase(token); remoteFutures.add(f); // // Note: This runs synchronously (ignores timeout). // f.run(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.Future; +import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; @@ -97,4 +98,20 @@ // */ // HAReceiveService<HAWriteMessage> getHAReceiveService(); + /** + * Return the lastCommitTime for this service (based on its current root + * block). This supports the {@link HAWriteMessage} which requires this + * information as part of the metadata about replicated {@link WriteCache} + * blocks. + */ + long getLastCommitTime(); + + /** + * Return the lastCommitCounter for this service (based on its current root + * block). This supports the {@link HAWriteMessage} which requires this + * information as part of the metadata about replicated {@link WriteCache} + * blocks. + */ + long getLastCommitCounter(); + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -24,6 +24,7 @@ import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IBufferAccess; import com.bigdata.journal.ha.HAWriteMessage; +import com.bigdata.quorum.QuorumException; import com.bigdata.quorum.QuorumMember; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; @@ -155,7 +156,7 @@ * follower. */ private IBufferAccess receiveBuffer; - + /** * Cached metadata about the downstream service. */ @@ -790,6 +791,20 @@ lock.lock(); try { + + if (receiveBuffer == null) { + + /* + * The quorum broke and the receive buffer was cleared or + * possibly we have become a leader. + * + * TODO We should probably pass in the Quorum and then just + * assert that the msg.getQuorumToken() is valid for the quorum. + */ + + throw new QuorumException(); + + } final PipelineState<S> downstream = pipelineStateRef.get(); @@ -797,12 +812,12 @@ log.trace("Will receive " + ((downstream != null) ? " and replicate" : "") + ": msg=" + msg); - + final ByteBuffer b = getReceiveBuffer(); final HAReceiveService<HAWriteMessage> receiveService = getHAReceiveService(); - if (downstream == null) { + if (downstream == null) { /* * This is the last service in the write pipeline, so just receive @@ -921,7 +936,6 @@ this.b = b; this.downstream = downstream; this.receiveService = receiveService; - } public Void call() throws Exception { @@ -1048,6 +1062,5 @@ } } - } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -27,6 +27,8 @@ package com.bigdata.ha; +import java.io.File; + import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumMember; @@ -63,5 +65,16 @@ * block). */ long getLastCommitTime(); + + /** + * Return the lastCommitCounter for this service (based on its current root + * block). + */ + long getLastCommitCounter(); + /** + * Return the directory in which we are logging the write blocks. + */ + File getHALogDir(); + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -27,8 +27,12 @@ package com.bigdata.ha; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; +import java.util.Formatter; import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -37,8 +41,7 @@ import org.apache.log4j.Logger; -import com.bigdata.ha.pipeline.HAReceiveService; -import com.bigdata.ha.pipeline.HASendService; +import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IResourceManager; import com.bigdata.journal.IRootBlockView; @@ -102,13 +105,27 @@ addListener(this.pipelineImpl = new QuorumPipelineImpl<S>(this) { @Override - protected void handleReplicatedWrite(HAWriteMessage msg, - ByteBuffer data) throws Exception { + protected void handleReplicatedWrite(final HAWriteMessage msg, + final ByteBuffer data) throws Exception { - QuorumServiceBase.this.handleReplicatedWrite(msg,data); + QuorumServiceBase.this.handleReplicatedWrite(msg, data); + + } + + @Override + public long getLastCommitTime() { + + return QuorumServiceBase.this.getLastCommitTime(); } - + + @Override + public long getLastCommitCounter() { + + return QuorumServiceBase.this.getLastCommitCounter(); + + } + }); addListener(this.commitImpl = new QuorumCommitImpl<S>(this)); @@ -202,7 +219,99 @@ */ abstract protected void handleReplicatedWrite(HAWriteMessage msg, ByteBuffer data) throws Exception; + + /** + * Log the {@link HAWriteMessage} and the associated data onto the + * appropriate log file. + * <p> + * Note: Logging MUST NOT start in the middle of a write set (all log files + * must be complete). The {@link HAWriteMessage#getSequence()} MUST be ZERO + * (0) when we open a log file. + * + * TODO The WORM should not bother to log the write cache block since it can + * be obtained directly from the backing store. Abstract out an object to + * manage the log file for a commit counter and make it smart about the WORM + * versus the RWStore. The same object will need to handle the read back + * from the log file and in the case of the WORM should get the data block + * from the backing file. However, this object is not responsible for reply + * of log files. We'll have to locate a place for that logic next - probably + * in this class (QuorumServiceBase). + * + * FIXME NOTHING IS CALLING THIS CODE YET! Invoke from + * {@link #handleReplicatedWrite(HAWriteMessage, ByteBuffer)} and from + * {@link WriteCacheService}s WriteTask.call() method (on the leader). + */ + public void logWriteCacheBlock(final HAWriteMessage msg, + final ByteBuffer data) throws IOException { +// final long currentCommitCounter = getLastCommitCounter(); + + getQuorum().assertQuorum(msg.getQuorumToken()); + + if (msg.getSequence() == 0L) { + + if (processLog != null) { + processLog.close(); + processLog = null; + } + + /* + * The commit counter that will be used to identify the file. + * + * Note: We use commitCounter+1 so the file will be labeled by the + * commit point that will be achieved when that log file is applied + * to a journal whose current commit point is [commitCounter]. + */ + final long commitCounter = msg.getCommitCounter() + 1; + + /* + * Format the name of the log file. + * + * Note: The commit counter in the file name should be zero filled + * to 20 digits so we have the files in lexical order in the file + * system (for convenience). + */ + final String logFile; + { + + final StringBuilder sb = new StringBuilder(); + + final Formatter f = new Formatter(sb); + + f.format("%020d.log", commitCounter); + + logFile = sb.toString(); + + } + + // Establish new log file. + processLog = new ObjectOutputStream(new FileOutputStream(new File( + getHALogDir(), logFile))); + + } + + /* + * FIXME We need to track whether or not we began the sequence at ZERO + * (0). If we did, then we can open a log and start writing for the + * current commitCounter. We do need to keep track of the commit counter + * associated with the log file so we can correctly refuse to log blocks + * on the log file that are associated with a different commit counter. + * We also need to manage the abort() and commit() transitions, ensuring + * that we truncate() the log for abort() (assuming it is for the same + * commit counter) and that we append the root block, force() and + * close() the log for commit. + */ + + } + + /** + * Process log to which the receiveService should write the messages to and + * <code>null</code> if we may not write on it. + * + * FIXME We need to clear this any time the quorum breaks. + */ + private ObjectOutputStream processLog = null; + /* * QuorumCommit. */ @@ -234,14 +343,30 @@ } @Override - public long getLastCommitTime() { + final public long getLastCommitTime() { final L localService = getLocalService(); - return localService.getLastCommitTime(); + return localService.getRootBlockView().getLastCommitTime(); } + @Override + final public long getLastCommitCounter() { + + final L localService = getLocalService(); + + return localService.getRootBlockView().getCommitCounter(); + + } + + @Override + final public File getHALogDir() { + + return getLocalService().getHALogDir(); + + } + /* * QuorumRead */ Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/pipeline/HAWriteMessageBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/pipeline/HAWriteMessageBase.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/pipeline/HAWriteMessageBase.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -29,6 +29,8 @@ import java.io.ObjectInput; import java.io.ObjectOutput; +import com.bigdata.journal.ha.HAWriteMessage; + /** * Base class for RMI messages used to communicate metadata about a raw data * transfer occurring on a socket channel. @@ -60,10 +62,14 @@ * The Alder32 checksum of the bytes to be transfered. */ public HAWriteMessageBase(final int sze, final int chk) { + if (sze <= 0) throw new IllegalArgumentException(); - this.sze = sze; - this.chk = chk; + + this.sze = sze; + + this.chk = chk; + } /** @@ -71,12 +77,18 @@ */ public HAWriteMessageBase() {} + /** The #of bytes of data to be transfered. */ public int getSize() { - return sze; + + return sze; + } - + + /** The Alder32 checksum of the bytes to be transfered. */ public int getChk() { - return chk; + + return chk; + } public String toString() { @@ -85,9 +97,41 @@ } + @Override + public boolean equals(final Object obj) { + + if (this == obj) + return true; + + if (!(obj instanceof HAWriteMessageBase)) + return false; + + final HAWriteMessageBase t = (HAWriteMessageBase) obj; + + return sze == t.getSize() && chk == t.getChk(); + + } + + @Override + public int hashCode() { + + // checksum is a decent hash code if given otherwise the size. + return chk == 0 ? sze : chk; + + } + + private static final transient short VERSION0 = 0x0; + + private static final transient short currentVersion = VERSION0; + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + final short version = in.readShort(); + + if (version != VERSION0) + throw new RuntimeException("Bad version for serialization"); + sze = in.readInt(); chk = in.readInt(); @@ -96,6 +140,8 @@ public void writeExternal(final ObjectOutput out) throws IOException { + out.writeShort(currentVersion); + out.writeInt(sze); out.writeInt(chk); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -371,6 +371,22 @@ */ private boolean m_closedForWrites = false; + /** + * The sequence must be set when the cache is ready to be flushed. In HA this + * is sent down the pipeline to ensure correct synchronization when processing + * logged messages. + */ + private long sequence = -1; + + /** + * The sequence must be set when the cache is ready to be flushed. In HA this + * is sent down the pipeline to ensure correct synchronization when processing + * logged messages. + */ + void setSequence(final long i) { + sequence = i; + } + /** * Create a {@link WriteCache} from either a caller supplied buffer or a * direct {@link ByteBuffer} allocated from the {@link DirectBufferPool}. @@ -1327,9 +1343,17 @@ * * @return cache A {@link WriteCache} to be replicated. */ - public HAWriteMessage newHAWriteMessage(final long quorumToken) { + final public HAWriteMessage newHAWriteMessage(// + final long quorumToken, + final long lastCommitCounter,// + final long lastCommitTime// + ) { - return new HAWriteMessage(bytesWritten(), getWholeBufferChecksum(), + return new HAWriteMessage( + lastCommitCounter,// + lastCommitTime,// + sequence, // + bytesWritten(), getWholeBufferChecksum(), prefixWrites ? StoreTypeEnum.RW : StoreTypeEnum.WORM, quorumToken, fileExtent.get(), firstOffset.get()); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -64,8 +64,10 @@ import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.WriteCache.WriteCacheCounters; import com.bigdata.journal.AbstractBufferStrategy; +import com.bigdata.journal.IBufferStrategy; import com.bigdata.journal.RWStrategy; import com.bigdata.journal.WORMStrategy; +import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumMember; import com.bigdata.rawstore.Bytes; @@ -485,6 +487,15 @@ localWriteFuture = localWriteService.submit(newWriteTask()); } + + /** + * Called from {@link IBufferStrategy#commit()} and {@link #reset()} to + * reset WriteCache sequence for HA synchronization. + */ + public void resetSequence() { + cacheSequence = 0; + } + private volatile long cacheSequence = 0; protected Callable<Void> newWriteTask() { @@ -556,6 +567,9 @@ /* * Only process non-empty cache buffers. */ + + // increment writeCache sequence + cache.setSequence(cacheSequence++); if (quorum != null && quorum.isHighlyAvailable()) { @@ -589,14 +603,31 @@ // flip(limit=pos;pos=0) b.flip(); assert b.remaining() > 0 : "Empty cache: " + cache; + // send to 1st follower. @SuppressWarnings("unchecked") final QuorumPipeline<HAPipelineGlue> quorumMember = (QuorumPipeline<HAPipelineGlue>) quorum .getMember(); + assert quorumMember != null : "Not quorum member?"; - remoteWriteFuture = quorumMember.replicate( - cache.newHAWriteMessage(quorumToken), b); + + final HAWriteMessage msg = cache.newHAWriteMessage( + quorumToken,// + quorumMember.getLastCommitCounter(),// + quorumMember.getLastCommitTime()// + ); + + /* + * FIXME The quorum leader must log the write cache + * block. However, it must be logged exactly once + * (if there is a retry, we do not want to re-log + * the block!) + */ + + remoteWriteFuture = quorumMember.replicate(msg, b); + counters.get().nsend++; + } /* @@ -614,8 +645,8 @@ if (remoteWriteFuture != null) { try { remoteWriteFuture.get(); - } catch (ExecutionException ex) { - retrySend(quorum, cache, ex); + } catch (ExecutionException ex) { + retrySend(quorum, cache, ex); } } @@ -799,11 +830,19 @@ b.flip(); assert b.remaining() > 0 : "Empty cache: " + cache; + + @SuppressWarnings("unchecked") + final QuorumPipeline<HAPipelineGlue> quorumMember = (QuorumPipeline<HAPipelineGlue>) quorum + .getMember(); + + final HAWriteMessage msg = cache.newHAWriteMessage(// + quorumToken,// + quorumMember.getLastCommitCounter(),// + quorumMember.getLastCommitTime()// + ); // send to 1st follower. - remoteWriteFuture = ((QuorumPipeline<HAPipelineGlue>) quorum - .getMember()).replicate(cache - .newHAWriteMessage(quorumToken), b); + remoteWriteFuture = quorumMember.replicate(msg, b); counters.get().nsend++; @@ -852,7 +891,7 @@ IReopenChannel<? extends Channel> opener, final long fileExtent) throws InterruptedException; - /** + /** * {@inheritDoc} * <p> * This implementation calls {@link IWriteCache#reset()} on all @@ -942,6 +981,9 @@ c.nclean = buffers.length-1; c.nreset++; } + + // reset cacheSequence for HA + cacheSequence = 0; /* * Restart the WriteTask Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -90,6 +90,7 @@ import com.bigdata.io.SerializerUtil; import com.bigdata.journal.Name2Addr.Entry; import com.bigdata.journal.ha.HAWriteMessage; +import com.bigdata.journal.jini.ha.HAJournal; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.mdi.JournalMetadata; import com.bigdata.quorum.Quorum; @@ -1507,8 +1508,22 @@ return tmp.getFile(); + } + + /** + * The HA log directory. + * + * @see HAJournal.Options#HA_LOG_DIR + * + * @throws UnsupportedOperationException + * always. + */ + public File getHALogDir() { + + throw new UnsupportedOperationException(); + } - + /** * Core implementation of immediate shutdown handles event reporting. */ @@ -2100,14 +2115,26 @@ } + /** + * {@inheritDoc} + * <p> + * Returns the current root block (immediate, non-blocking peek). + * <p> + * Note: The root block reference can be <code>null</code> until the journal + * has been initialized. Once it has been set, the root block will always be + * non-<code>null</code>. Since this method does not obtain the inner lock, + * it is possible for another thread to change the root block reference + * through a concurrent {@link #abort()} or {@link #commitNow(long)}. The + * {@link IRootBlockView} itself is an immutable data structure. + */ final public IRootBlockView getRootBlockView() { - final ReadLock lock = _fieldReadWriteLock.readLock(); +// final ReadLock lock = _fieldReadWriteLock.readLock(); +// +// lock.lock(); +// +// try { - lock.lock(); - - try { - if (_rootBlock == null) { /* @@ -2122,30 +2149,30 @@ return _rootBlock; - } finally { +// } finally { +// +// lock.unlock(); +// +// } - lock.unlock(); - - } - } final public long getLastCommitTime() { - final ReadLock lock = _fieldReadWriteLock.readLock(); +// final ReadLock lock = _fieldReadWriteLock.readLock(); +// +// lock.lock(); +// +// try { - lock.lock(); - - try { - return _rootBlock.getLastCommitTime(); - } finally { +// } finally { +// +// lock.unlock(); +// +// } - lock.unlock(); - - } - } /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/Journal.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/Journal.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -1010,6 +1010,7 @@ * IResourceManager */ + @Override public File getTmpDir() { return tmpDir; @@ -1020,6 +1021,7 @@ * The directory in which the journal's file is located -or- * <code>null</code> if the journal is not backed by a file. */ + @Override public File getDataDir() { final File file = getFile(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -1090,15 +1090,19 @@ * {@inheritDoc} * <p> * This implementation flushes the write cache (if enabled). - * - * @todo Should be a NOP for the WORM? Check - * {@link AbstractJournal#commitNow(long)} */ @Override public void commit() { flushWriteCache(); + if (writeCacheService != null) { + + // Reset the write cache block counter. + writeCacheService.resetSequence(); + + } + } /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/ha/HAWriteMessage.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -48,7 +48,16 @@ */ private static final long serialVersionUID = -2673171474897401979L; - /** The type of backing store (RW or WORM). */ + /** The most recent commit counter associated with this message */ + private long commitCounter; + + /** The most recent commit time associated with this message */ + private long commitTime; + + /** The write sequence since last commit beginning at zero */ + private long sequence; + + /** The type of backing store (RW or WORM). */ private StoreTypeEnum storeType; /** The quorum token for which this message is valid. */ @@ -60,10 +69,25 @@ /** The file offset at which the data will be written (WORM only). */ private long firstOffset; - // /** The write cache buffer sequence number (incremented for each buffer - // sent by the master). */ - // private long sequenceId; + /** The commit counter associated with this message */ + public long getCommitCounter() { + return commitCounter; + } + + /** The commit time associated with this message. */ + public long getCommitTime() { + return commitTime; + } + /** + * The write cache buffer sequence number (reset to ZERO (0) for the first + * message after each commit and incremented for each buffer sent by the + * leader). + */ + public long getSequence() { + return sequence; + } + /** The type of backing store (RW or WORM). */ public StoreTypeEnum getStoreType() { return storeType; @@ -86,11 +110,18 @@ public String toString() { - return getClass().getName() + "{size=" + getSize() + ",chksum=" - + getChk() + ",storeType=" + getStoreType() + ",quorumToken=" - + getQuorumToken() + ",fileExtent=" + getFileExtent() - + ",firstOffset=" + getFirstOffset() + "}"; - + return getClass().getName() // + + "{size=" + getSize() // + + ",chksum=" + getChk() // + + ",commitCounter=" + commitCounter // + + ",commitTime=" + commitTime // + + ",sequence=" + sequence // + + ",storeType=" + getStoreType() // + + ",quorumToken=" + getQuorumToken()// + + ",fileExtent=" + getFileExtent() // + + ",firstOffset=" + getFirstOffset() // + + "}"; + } /** @@ -100,6 +131,16 @@ } /** + * @param commitCounter + * The commit counter for the current root block for the write + * set which is being replicated by this message. + * @param commitTime + * The commit time for the current root block for the write set + * which is being replicated by this message. + * @param sequence + * The write cache block sequence number. This is reset to ZERO + * (0) for the first replicated write cache block in each write + * set. * @param sze * The #of bytes in the payload. * @param chk @@ -113,7 +154,8 @@ * @param firstOffset * The file offset at which the data will be written (WORM only). */ - public HAWriteMessage(final int sze, final int chk, + public HAWriteMessage(final long commitCounter, final long commitTime, + final long sequence, final int sze, final int chk, final StoreTypeEnum storeType, final long quorumToken, final long fileExtent, final long firstOffset) { @@ -122,6 +164,12 @@ if (storeType == null) throw new IllegalArgumentException(); + this.commitCounter = commitCounter; + + this.commitTime = commitTime; + + this.sequence = sequence; + this.storeType = storeType; this.quorumToken = quorumToken; @@ -134,30 +182,60 @@ private static final byte VERSION0 = 0x0; - public void readExternal(final ObjectInput in) throws IOException, - ClassNotFoundException { + @Override + public boolean equals(final Object obj) { + + if (this == obj) + return true; - super.readExternal(in); - final byte version = in.readByte(); - switch (version) { - case VERSION0: - break; - default: - throw new IOException("Unknown version: " + version); - } - storeType = StoreTypeEnum.valueOf(in.readByte()); - quorumToken = in.readLong(); - fileExtent = in.readLong(); - firstOffset = in.readLong(); - } + if (!super.equals(obj)) + return false; - public void writeExternal(final ObjectOutput out) throws IOException { - super.writeExternal(out); - out.write(VERSION0); - out.writeByte(storeType.getType()); - out.writeLong(quorumToken); - out.writeLong(fileExtent); - out.writeLong(firstOffset); + if (!(obj instanceof HAWriteMessage)) + return false; + + final HAWriteMessage other = (HAWriteMessage) obj; + + return commitCounter == other.getCommitCounter() + && commitTime == other.getCommitTime() // + && sequence == other.getSequence() + && storeType == other.getStoreType() + && quorumToken == other.getQuorumToken() + && fileExtent == other.getFileExtent() + && firstOffset == other.getFirstOffset(); + } + public void readExternal(final ObjectInput in) throws IOException, + ClassNotFoundException { + + super.readExternal(in); + final byte version = in.readByte(); + switch (version) { + case VERSION0: + break; + default: + throw new IOException("Unknown version: " + version); + } + storeType = StoreTypeEnum.valueOf(in.readByte()); + commitCounter = in.readLong(); + commitTime = in.readLong(); + sequence = in.readLong(); + quorumToken = in.readLong(); + fileExtent = in.readLong(); + firstOffset = in.readLong(); + } + + public void writeExternal(final ObjectOutput out) throws IOException { + super.writeExternal(out); + out.write(VERSION0); + out.writeByte(storeType.getType()); + out.writeLong(commitCounter); + out.writeLong(commitTime); + out.writeLong(sequence); + out.writeLong(quorumToken); + out.writeLong(fileExtent); + out.writeLong(firstOffset); + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -2578,6 +2578,7 @@ try { m_writeCache.flush(true); + m_writeCache.resetSequence(); } catch (InterruptedException e) { log.error(e, e); throw new RuntimeException(e); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestAll.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestAll.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -62,9 +62,11 @@ final TestSuite suite = new TestSuite("high availability"); + suite.addTestSuite(TestHAWriteMessage.class); + // if (s_includeHA) suite.addTest(com.bigdata.ha.pipeline.TestAll.suite()); - + return suite; } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestHAWriteMessage.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestHAWriteMessage.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/ha/TestHAWriteMessage.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -0,0 +1,80 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +import java.io.IOException; + +import junit.framework.TestCase; + +import com.bigdata.btree.BytesUtil; +import com.bigdata.io.SerializerUtil; +import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.ha.HAWriteMessage; + +public class TestHAWriteMessage extends TestCase { + + /** + * Simple test to verify HAWriteMessage serialization + */ + public void testSerialization() throws IOException, ClassNotFoundException { + + final HAWriteMessage msg1 = new HAWriteMessage( + 12L,// commitCounter + 13L,// commitTime + 14L,// sequence + 15,// size + 16,// checksum + StoreTypeEnum.RW,// + 17L,// quorumToken + 18L,// fileExtent + 19L // firstOffset + ); + + final byte[] ser1 = serialized(msg1); + + final HAWriteMessage msg2 = (HAWriteMessage) SerializerUtil + .deserialize(ser1); + + assertTrue(msg1.equals(msg2)); + + // now confirm serialized byte equivalence in case we just messed up + // equals + final byte[] ser2 = serialized(msg2); + + assertTrue(BytesUtil.bytesEqual(ser1, ser2)); + +// System.err.println("msg1: " + msg1); +// System.err.println("msg2: " + msg2); + + } + + /** + * Utility to return byte[] serialization of the HAWriteMessage + */ + private byte[] serialized(final HAWriteMessage msg) { + + return SerializerUtil.serialize(msg); + } + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -53,11 +53,9 @@ import com.bigdata.ha.HAPipelineGlue; import com.bigdata.ha.QuorumPipeline; import com.bigdata.ha.QuorumPipelineImpl; -import com.bigdata.ha.pipeline.HAReceiveService; -import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.DirectBufferPool; +import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IBufferAccess; -import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.io.TestCase3; import com.bigdata.io.writecache.WriteCache.FileChannelScatteredWriteCache; @@ -67,10 +65,10 @@ import com.bigdata.quorum.AbstractQuorumMember; import com.bigdata.quorum.AbstractQuorumTestCase; import com.bigdata.quorum.MockQuorumFixture; +import com.bigdata.quorum.MockQuorumFixture.MockQuorum; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumActor; import com.bigdata.quorum.QuorumMember; -import com.bigdata.quorum.MockQuorumFixture.MockQuorum; import com.bigdata.rawstore.Bytes; import com.bigdata.util.ChecksumUtility; @@ -303,6 +301,20 @@ } + @Override + public long getLastCommitTime() { + + return MyMockQuorumMember.this.getLastCommitTime(); + + } + + @Override + public long getLastCommitCounter() { + + return MyMockQuorumMember.this.getLastCommitCounter(); + + } + }); } @@ -358,6 +370,23 @@ } + @Override + public long getLastCommitTime() { + + return lastCommitTime; + + } + + @Override + public long getLastCommitCounter() { + + return lastCommitCounter; + + } + + private long lastCommitCounter = 0; + private long lastCommitTime = 0; + } // MockQuorumMemberImpl /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -27,9 +27,6 @@ package com.bigdata.journal.ha; -import com.bigdata.ha.pipeline.TestHASendAndReceive; -import com.bigdata.ha.pipeline.TestHASendAndReceive3Nodes; - import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config 2012-10-03 15:13:47 UTC (rev 6643) @@ -21,6 +21,7 @@ import com.bigdata.util.config.NicUtil; import com.bigdata.journal.Options; import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; import com.bigdata.jini.lookup.entry.*; import com.bigdata.service.IBigdataClient; import com.bigdata.service.AbstractTransactionService; @@ -89,6 +90,9 @@ // journal data directory. private static dataDir = serviceDir; + // HA log directory. + private static logDir = new File(serviceDir,"logs"); + // one federation, multicast discovery. //static private groups = LookupDiscovery.ALL_GROUPS; @@ -278,6 +282,8 @@ new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + new NV(HAJournal.Options.HA_LOG_DIR, ""+bigdata.logDir), + }, bigdata.kb); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config 2012-10-03 15:13:47 UTC (rev 6643) @@ -21,6 +21,7 @@ import com.bigdata.util.config.NicUtil; import com.bigdata.journal.Options; import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; import com.bigdata.jini.lookup.entry.*; import com.bigdata.service.IBigdataClient; import com.bigdata.service.AbstractTransactionService; @@ -89,6 +90,9 @@ // journal data directory. private static dataDir = serviceDir; + // HA log directory. + private static logDir = new File(serviceDir,"logs"); + // one federation, multicast discovery. //static private groups = LookupDiscovery.ALL_GROUPS; @@ -278,6 +282,8 @@ new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + new NV(HAJournal.Options.HA_LOG_DIR, ""+bigdata.logDir), + }, bigdata.kb); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.config =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.config 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.config 2012-10-03 15:13:47 UTC (rev 6643) @@ -21,6 +21,7 @@ import com.bigdata.util.config.NicUtil; import com.bigdata.journal.Options; import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; import com.bigdata.jini.lookup.entry.*; import com.bigdata.service.IBigdataClient; import com.bigdata.service.AbstractTransactionService; @@ -89,6 +90,9 @@ // journal data directory. private static dataDir = serviceDir; + // HA log directory. + private static logDir = new File(serviceDir,"logs"); + // one federation, multicast discovery. //static private groups = LookupDiscovery.ALL_GROUPS; @@ -279,6 +283,8 @@ new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + new NV(HAJournal.Options.HA_LOG_DIR, ""+bigdata.logDir), + }, bigdata.kb); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -23,6 +23,7 @@ */ package com.bigdata.journal.jini.ha; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.InetSocketAddress; @@ -37,11 +38,16 @@ import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.ha.HAGlue; import com.bigdata.ha.QuorumService; +import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.BufferMode; +import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.Journal; import com.bigdata.journal.ValidationError; +import com.bigdata.journal.WORMStrategy; +import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.zk.ZKQuorumImpl; +import com.bigdata.rwstore.RWStore; import com.bigdata.service.AbstractTransactionService; import com.bigdata.service.proxy.ThickFuture; @@ -86,9 +92,70 @@ String WRITE_PIPELINE_ADDR = HAJournal.class.getName() + ".writePipelineAddr"; + /** + * The required property whose value is the name of the directory in + * which write ahead log files will be created to support + * resynchronization services trying to join an HA quorum. + * <p> + * The directory should not contain any other files. It will be + * populated with files whose names correspond to commit counters. The + * commit counter is recorded in the root block at each commit. It is + * used to identify the write set for a given commit. A log file is + * written for each commit point. Each log files is normally deleted at + * the commit. However, if the quorum is not fully met at the commit, + * then the log files not be deleted. Instead, they will be used to + * resynchronize the other quorum members. + * <p> + * The log file name includes the value of the commit counter for the + * commit point that will be achieved when that log file is applied to a + * journal whose current commit point is [commitCounter-1]. The commit + * counter for a new journal (without any commit points) is ZERO (0). + * This the first log file will be labeled with the value ONE (1). The + * commit counter is written out with leading zeros in the log file name + * so the natural sort order of the log files should correspond to the + * ascending commit order. + * <p> + * The log files are a sequence of zero or more {@link HAWriteMessage} + * objects. For the {@link RWStore}, each {@link HAWriteMessage} is + * followed by the data from the corresponding {@link WriteCache} block. + * For the {@link WORMStrategy}, the {@link WriteCache} block is omitted + * since this data can be trivially reconstructed from the backing file. + * When the quorum prepares for a commit, the proposed root block is + * written onto the end of the log file. + * <p> + * The log files are deleted once the quorum is fully met (k out of k + * nodes have met in the quorum). It is possible for a quorum to form + * with only <code>(k+1)/2</code> nodes. When this happens, the nodes in + * the quorum will all write log files into the {@link #HA_LOG_DIR}. + * Those files will remain until the other nodes in the quorum + * synchronize and join the quorum. Once the quorum is fully met, the + * files in the log directory will be deleted. + * <p> + * If some or all log files are not present, then any node that is not + * synchronized with the quorum must be rebuilt from scratch rather than + * by incrementally applying logged write sets until it catches up and + * can join the quorum. + * + * @see IRootBlockView#getCommitCounter() + * + * TODO We may need to also write a marker either on the head or + * tail of the log file to indicate that the commit was applied. + * Work this out when we work through the extension to the 2-phase + * commit protocol that supports resynchronization. + */ + String HA_LOG_DIR = HAJournal.class.getName() + ".haLogDir"; + } + /** + * @see Options#WRITE_PIPELINE_ADDR + */ private final InetSocketAddress writePipelineAddr; + + /** + * @see Options#HA_LOG_DIR + */ + private final File haLogDir; public HAJournal(final Properties properties) { @@ -110,6 +177,17 @@ writePipelineAddr = (InetSocketAddress) properties .get(Options.WRITE_PIPELINE_ADDR); + final String logDirStr = properties.getProperty(Options.HA_LOG_DIR); + + haLogDir = new File(logDirStr); + + if (!haLogDir.exists()) { + + // Create the directory. + haLogDir.mkdirs(); + + } + } /** @@ -172,6 +250,15 @@ } + final String logDirStr = properties.getProperty(Options.HA_LOG_DIR); + + if (logDirStr == null) { + + throw new IllegalArgumentException(Options.HA_LOG_DIR + + " : must be specified"); + + } + return properties; } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2012-10-03 15:13:47 UTC (rev 6643) @@ -939,9 +939,16 @@ // } return null; } catch (Throwable t) { + log.error(t); if (cxn != null && !cxn.isReadOnly()) { /* * Force rollback of the connection. + * + * Note: It is possible that the commit has already been + * processed, in which case this rollback() will be a NOP. + * This can happen when there is an IO error when + * communicating with the client, but the database has + * already gone through a commit. */ cxn.rollback(); } @@ -1418,9 +1425,8 @@ lastOp = thisOp; // Write out the LOAD operation. - body.node("p")// - .node("pre").text(thisOp.toString()).close()// - .close(); + body.node("pre").text(thisOp.toString())// + .close(); } @@ -1451,10 +1457,10 @@ pw.flush(); pw.close(); - body.node("p").text("ABORT")// + body.node("p").text("ABORT").close()// .node("pre").text(e.getUpdate().toString()).close()// .node("pre").text(w.toString()).close()// - .text("totalElapsed=" + totalElapsedMillis + .node("p").text("totalElapsed=" + totalElapsedMillis + "ms, elapsed=" + elapsedMillis + "ms") .close(); @@ -1488,16 +1494,15 @@ // .close(); } else { - body.node("p") - // - .node("pre") + body.node("pre") .text(e.getUpdate().toString()) .close() // + .node("p") .text("totalElapsed=" + totalElapsedMillis + "ms, elapsed=" + elapsedMillis + "ms")// .close(); - } + } // horizontal line after each operation. body.node("hr").close(); Modified: branches/BIGDATA_RELEASE_1_2_0/src/resources/HAJournal/HAJournal.config =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/src/resources/HAJournal/HAJournal.config 2012-10-02 16:32:04 UTC (rev 6642) +++ branches/BIGDATA_RELEASE_1_2_0/src/resources/HAJournal/HAJournal.config 2012-10-03 15:13:47 UTC (rev 6643) @@ -22,6 +22,7 @@ import com.bigdata.journal.Options; import com.bigdata.journal.BufferMode; import com.bigdata.journal.Journal; +import com.bigdata.journal.jini.ha.HAJournal; import com.bigdata.jini.lookup.entry.*; import com.bigdata.service.IBigdataClient; import com.bigdata.service.AbstractTransactionService; @@ -91,6 +92,9 @@ // journal data directory. private static dataDir = serviceDir; + // HA log directory. + private static logDir = new File(serviceDir,"logs"); + // one federation, multicast discovery. //static private groups = LookupDiscovery.ALL_GROUPS; @@ -286,6 +290,8 @@ new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + new NV(HAJournal.Options.HA_LOG_DIR, ""+bigdata.logDir), + /* Enable statistics collection and reporting. */ new NV(Journal.Options.COLLECT_QUEUE_STATISTICS,"true"), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |