From: <tho...@us...> - 2012-10-04 14:59:51
|
Revision: 6654 http://bigdata.svn.sourceforge.net/bigdata/?rev=6654&view=rev Author: thompsonbry Date: 2012-10-04 14:59:40 +0000 (Thu, 04 Oct 2012) Log Message: ----------- Working towards a functional integration of the HA Log Writer with the HAJournal. Fixed several bugs in the process log writer related to writing of ByteBuffers and to the consistency checks for the root block. Moved the HALogWriter initialization into the HAJournal constructor. Added getBytes(ByteBuffer) to BytesUtil and removed it from TestCase3. This is now used by the ProcessLogWriter. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/btree/BytesUtil.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/ProcessLogWriter.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/TestCase3.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/btree/BytesUtil.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2012-10-04 14:17:47 UTC (rev 6653) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2012-10-04 14:59:40 UTC (rev 6654) @@ -1579,4 +1579,43 @@ } + /** + * Return the data in the buffer. When possible, the backing array is + * returned. Otherwise, a new byte[] is allocated, the data are copied into + * the array, and the new array is returned. + */ + public static byte[] getBytes(ByteBuffer buf) { + + if (buf.hasArray() && buf.arrayOffset() == 0 && buf.position() == 0 + && buf.limit() == buf.capacity()) { + + /* + * Return the backing array. + */ + + return buf.array(); + + } + + /* + * Copy the expected data into a byte[] using a read-only view on the + * buffer so that we do not mess with its position, mark, or limit. + */ + final byte[] a; + { + + buf = buf.asReadOnlyBuffer(); + + final int len = buf.remaining(); + + a = new byte[len]; + + buf.get(a); + + } + + return a; + + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/ProcessLogWriter.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/ProcessLogWriter.java 2012-10-04 14:17:47 UTC (rev 6653) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/ProcessLogWriter.java 2012-10-04 14:59:40 UTC (rev 6654) @@ -10,6 +10,7 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.ha.HAWriteMessage; @@ -26,7 +27,10 @@ */ public class ProcessLogWriter { - private static final Logger log = Logger.getLogger(ProcessLogWriter.class); + /** + * Logger for HA events. + */ + protected static final Logger haLog = Logger.getLogger("com.bigdata.haLog"); /** HA log directory. */ private final File m_dir; @@ -136,23 +140,29 @@ if (rootBlock == null) throw new IllegalArgumentException(); - if (rootBlock.getCommitCounter() != this.m_rootBlock.getCommitCounter()) { + final long expectedCommitCounter = this.m_rootBlock.getCommitCounter() + 1; - throw new IllegalStateException(); + if (expectedCommitCounter != rootBlock.getCommitCounter()) { + throw new IllegalStateException("CommitCounter: expected=" + + expectedCommitCounter + ", actual=" + + rootBlock.getCommitCounter()); + } - if (rootBlock.getLastCommitTime() != this.m_rootBlock - .getLastCommitTime()) { +// if (rootBlock.getLastCommitTime() != this.m_rootBlock +// .getLastCommitTime()) { +// +// throw new IllegalStateException(); +// +// } - throw new IllegalStateException(); + if (!this.m_rootBlock.getUUID().equals(rootBlock.getUUID())) { - } + throw new IllegalStateException("Store UUID: expected=" + + (m_rootBlock.getUUID()) + ", actual=" + + rootBlock.getUUID()); - if (rootBlock.getUUID() != this.m_rootBlock.getUUID()) { - - throw new IllegalStateException(); - } writeRootBlock(rootBlock); @@ -169,10 +179,10 @@ if (rootBlock == null) throw new IllegalArgumentException(); - m_out.write(rootBlock.asReadOnlyBuffer().array()); + m_out.write(BytesUtil.getBytes(rootBlock.asReadOnlyBuffer())); - if (log.isDebugEnabled()) - log.debug("wrote root block: " + rootBlock); + if (haLog.isDebugEnabled()) + haLog.debug("wrote root block: " + rootBlock); } @@ -200,17 +210,23 @@ if (m_sequence != msg.getSequence()) return; - final byte[] array = data.array(); - m_out.writeObject(msg); switch(m_rootBlock.getStoreType()) { case RW: { + /* + * FIXME Efficient channel access and write. I think that we are + * much better off reusing the WORMStategy without the WriteCache + * and pre-serializing the HAWriteMessage as a byte[]. That will + * give us efficient, proven writes and a place to put both root + * blocks. + */ + final byte[] array = BytesUtil.getBytes(data); + assert msg.getSize() == array.length; - // TODO Efficient channel access and write - must flush first? - m_out.write(data.array()); + m_out.write(array); } case WORM: break; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/TestCase3.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/TestCase3.java 2012-10-04 14:17:47 UTC (rev 6653) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/io/TestCase3.java 2012-10-04 14:59:40 UTC (rev 6654) @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; +import com.bigdata.btree.BytesUtil; import com.bigdata.journal.TestHelper; import junit.framework.TestCase; @@ -168,38 +169,10 @@ /** * Return the data in the buffer. */ - public static byte[] getBytes(ByteBuffer buf) { + public static byte[] getBytes(final ByteBuffer buf) { - if (buf.hasArray() && buf.arrayOffset() == 0 && buf.position() == 0 - && buf.limit() == buf.capacity()) { + return BytesUtil.getBytes(buf); - /* - * Return the backing array. - */ - - return buf.array(); - - } - - /* - * Copy the expected data into a byte[] using a read-only view on the - * buffer so that we do not mess with its position, mark, or limit. - */ - final byte[] a; - { - - buf = buf.asReadOnlyBuffer(); - - final int len = buf.remaining(); - - a = new byte[len]; - - buf.get(a); - - } - - return a; - } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-10-04 14:17:47 UTC (rev 6653) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2012-10-04 14:59:40 UTC (rev 6654) @@ -37,6 +37,7 @@ import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.ha.HAGlue; +import com.bigdata.ha.ProcessLogWriter; import com.bigdata.ha.QuorumService; import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.BufferMode; @@ -157,6 +158,25 @@ */ private final File haLogDir; + /** + * Write ahead log for replicated writes used to resynchronize services that + * are not in the met quorum. + * + * @see Options#HA_LOG_DIR + * @see ProcessLogWriter + */ + private final ProcessLogWriter haLogWriter; + + /** + * The {@link ProcessLogWriter} for this {@link HAJournal} and never + * <code>null</code>. + */ + ProcessLogWriter getHALogWriter() { + + return haLogWriter; + + } + public HAJournal(final Properties properties) { this(properties, null); @@ -187,7 +207,10 @@ haLogDir.mkdirs(); } - + + // Set up the HA log writer. + haLogWriter = new ProcessLogWriter(haLogDir); + } /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-04 14:17:47 UTC (rev 6653) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-10-04 14:59:40 UTC (rev 6654) @@ -149,12 +149,6 @@ */ private HAJournal journal; - /** - * Write ahead log for replicated writes used to resynchronize services that - * are not in the met quorum. - */ - private ProcessLogWriter haLogWriter = null; - private UUID serviceUUID; private HAGlue haGlueService; private ZookeeperClientConfig zkClientConfig; @@ -381,8 +375,6 @@ this.journal = new HAJournal(properties, quorum); - this.haLogWriter = new ProcessLogWriter(journal.getHALogDir()); - } haGlueService = journal.newHAGlue(serviceUUID); @@ -600,7 +592,7 @@ try { - server.haLogWriter.disable(); + journal.getHALogWriter().disable(); } catch (IOException e) { @@ -626,8 +618,8 @@ try { - server.haLogWriter - .createLog(journal.getRootBlockView()); + journal.getHALogWriter().createLog( + journal.getRootBlockView()); } catch (IOException e) { @@ -736,7 +728,7 @@ if (!HA_LOG_ENABLED) return; - server.haLogWriter.write(msg, data); + journal.getHALogWriter().write(msg, data); } @@ -752,7 +744,7 @@ if (!HA_LOG_ENABLED) return; - server.haLogWriter.closeLog(rootBlock); + journal.getHALogWriter().closeLog(rootBlock); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |