From: <tho...@us...> - 2013-01-23 18:59:38
|
Revision: 6820 http://bigdata.svn.sourceforge.net/bigdata/?rev=6820&view=rev Author: thompsonbry Date: 2013-01-23 18:59:30 +0000 (Wed, 23 Jan 2013) Log Message: ----------- My edits from code review of READ_CACHE with Martyn. Modified Paths: -------------- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IBackingReader.java branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IWriteCache.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Options.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/VerifyCommitRecordIndex.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestTx.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/althalog/HALogFile.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -34,11 +34,9 @@ import java.security.DigestException; import java.security.MessageDigest; import java.util.Formatter; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.log4j.Logger; @@ -163,7 +161,7 @@ // protected by m_writeLock private long m_writePosition = -1; - private AtomicLong m_sequence = new AtomicLong(0); + private long m_sequence = 0; /** * This constructor is called by the log manager to create the file. A @@ -341,7 +339,7 @@ + m_openRootBlock.getLastCommitTime() + ", but msg=" + msg); - if (m_sequence.get() != msg.getSequence()) + if (m_sequence != msg.getSequence()) throw new IllegalStateException("nextSequence=" + m_sequence + ", but msg=" + msg); @@ -392,7 +390,7 @@ throw new AssertionError(); } - m_sequence.incrementAndGet(); + m_sequence++; m_fileChange.signalAll(); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IBackingReader.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IBackingReader.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IBackingReader.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -41,9 +41,9 @@ * {@link ByteBuffer#remaining()} bytes will be read into the caller's * buffer, starting at the specified offset in the backing file. * - * @param offset - * The offset of the first byte (relative to the start of the - * data region). + * @param fileOffset + * The offset of the first byte to be read (absolute offset on + * the backing file). * @param dst * Where to put the data. Bytes will be written at position until * limit. @@ -51,6 +51,6 @@ * @return The caller's buffer, prepared for reading back the installed * record. */ - public ByteBuffer readRaw(long offset, ByteBuffer dst); + public ByteBuffer readRaw(long fileOffset, ByteBuffer dst); } Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IWriteCache.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IWriteCache.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/io/writecache/IWriteCache.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -44,42 +44,6 @@ */ public interface IWriteCache { -// /** -// * Write the record on the cache. This interface DOES NOT provide any -// * guarantee about the ordering of writes. Callers who require a specific -// * ordering must coordinate that ordering themselves, e.g., by synchronizing -// * across their writes onto the cache. -// * -// * @param offset -// * The offset of that record (maybe relative to a base offset). -// * @param data -// * The record. The bytes from the current -// * {@link ByteBuffer#position()} to the -// * {@link ByteBuffer#limit()} will be written and the -// * {@link ByteBuffer#position()} will be advanced to the -// * {@link ByteBuffer#limit()} . The caller may subsequently -// * modify the contents of the buffer without changing the state -// * of the cache (i.e., the data are copied into the cache). -// * -// * @return <code>true</code> iff the caller's record was transferred to the -// * cache. When <code>false</code>, there is not enough room left in -// * the write cache for this record. -// * -// * @throws InterruptedException -// * @throws IllegalStateException -// * If the buffer is closed. -// * @throws IllegalArgumentException -// * If the caller's record is larger than the maximum capacity of -// * cache (the record could not fit within the cache). The caller -// * should check for this and provide special handling for such -// * large records. For example, they can be written directly onto -// * the backing channel. -// * -// * @deprecated by {@link #writeChk(long, ByteBuffer, int)} -// */ -// public boolean write(final long offset, final ByteBuffer data) -// throws InterruptedException; - /** * Write the record on the cache. This interface DOES NOT provide any * guarantee about the ordering of writes. Callers who require a specific @@ -87,7 +51,7 @@ * across their writes onto the cache. * * @param offset - * The offset of that record (maybe relative to a base offset). + * The file offset of that record in the backing file. * @param data * The record. The bytes from the current * {@link ByteBuffer#position()} to the @@ -122,8 +86,8 @@ /** * Read a record from the write cache. * - * @param offset - * The offset of that record (maybe relative to a base offset). + * @param offset + * The file offset of that record in the backing file. * @param nbytes * The length of the record (decoded from the address by the * caller). @@ -146,36 +110,6 @@ public ByteBuffer read(final long offset, final int nbytes) throws InterruptedException, ChecksumError; -// /** -// * Update a record in the write cache. -// * -// * @param addr -// * The address of the record. -// * @param off -// * The byte offset of the update. -// * @param data -// * The data to be written onto the record in the cache starting -// * at that byte offset. The bytes from the current -// * {@link ByteBuffer#position()} to the -// * {@link ByteBuffer#limit()} will be written and the -// * {@link ByteBuffer#position()} will be advanced to the -// * {@link ByteBuffer#limit()}. The caller may subsequently modify -// * the contents of the buffer without changing the state of the -// * cache (i.e., the data are copied into the cache). -// * -// * @return <code>true</code> iff the record was updated and -// * <code>false</code> if no record for that address was found in the -// * cache. -// * -// * @throws InterruptedException -// * @throws IllegalStateException -// * -// * @throws IllegalStateException -// * if the buffer is closed. -// */ -// public boolean update(final long addr, final int off, final ByteBuffer data) -// throws IllegalStateException, InterruptedException; - /** * Flush the writes to the backing channel but does not force anything to * the backing channel. The caller is responsible for managing when the Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -1389,7 +1389,7 @@ if (_bufferStrategy.isOpen()) { if (log.isInfoEnabled()) - log.info("Closing journal in finalize: " + getFile()); + log.info("Closing journal: " + getFile()); shutdownNow(); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/IHABufferStrategy.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -170,10 +170,10 @@ /** * Used to support the rebuild protocol. * - * @param position - absolute file offset + * @param fileOffset - absolute file offset * @param transfer - target buffer for read */ - ByteBuffer readRaw(long position, ByteBuffer transfer); + ByteBuffer readRaw(long fileOffset, ByteBuffer transfer); /** * Used to support the rebuild protocol Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Options.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Options.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Options.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -240,24 +240,43 @@ String WRITE_CACHE_BUFFER_COUNT = AbstractJournal.class.getName()+".writeCacheBufferCount"; /** + * <strong>ALPHA FEATURE</strong> + * <p> * Option may be used to override the #of {@link ReadCache} buffers which - * will be used with a {@link WriteCacheService}. + * will be used with a {@link WriteCacheService}. When ZERO (0) the read + * cache is disabled. + * <p> + * The purpose of the read cache is to provide additional buffering of the + * disk under the control of the database. When enabled, records read from + * the disk are installed into the read cache as well as write cache records + * once they have been flushed to the disk. Records evicted from the read + * cache are transferred to the hot cache (if enabled) if they have been + * touched multiple times while on the read cache. * * @see #DEFAULT_READ_CACHE_BUFFER_COUNT + * @see #HOT_CACHE_SIZE */ String READ_CACHE_BUFFER_COUNT = AbstractJournal.class.getName()+".readCacheBufferCount"; /** - * Option may be used to override the threshold at which retained readCache records - * are moved to the hotCache which will be used with a {@link WriteCacheService}. + * <strong>ALPHA FEATURE</strong> + * <p> + * Option may be used to override the #of touches at which a retained + * readCache record is moved to the hotCache. Records which are touched + * multiple times while on the read cache are transferred to the hot cache. + * Records evicted from the hot cache are dropped back to the read cache + * where they get a chance to demonstrate that they are still hot before + * being evicted from the read cache. * * @see #DEFAULT_HOT_CACHE_THRESHOLD */ String HOT_CACHE_THRESHOLD = AbstractJournal.class.getName()+".hotCacheThreshold"; /** - * Option may be used to override the number of hotCache buffers which will be - * used with a {@link WriteCacheService}. + * <strong>ALPHA FEATURE</strong> + * <p> + * Option may be used to override the number of hotCache buffers which will + * be used with a {@link WriteCacheService}. * * @see #DEFAULT_HOT_CACHE_SIZE */ @@ -632,10 +651,10 @@ */ String DEFAULT_WRITE_CACHE_COMPACTION_THRESHOLD = "20"; - /** - * The default for {@link #READ_CACHE_MAX_RECORD_SIZE}. - */ - String DEFAULT_READ_CACHE_MAX_RECORD_SIZE = ""+(2*Bytes.kilobyte); +// /** +// * The default for {@link #READ_CACHE_MAX_RECORD_SIZE}. +// */ +// String DEFAULT_READ_CACHE_MAX_RECORD_SIZE = ""+(2*Bytes.kilobyte); /** * The default for {@link #HOT_CACHE_THRESHOLD}. Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/VerifyCommitRecordIndex.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/VerifyCommitRecordIndex.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/VerifyCommitRecordIndex.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -94,6 +94,8 @@ final int removed = jrnl.removeCommitRecordEntries(zeroKey, releaseKey); + System.out.println("Commit Record Index verified with " + removed + " records removed"); + jrnl.commit(); } Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -980,7 +980,7 @@ final long fileExtent, final IReopenChannel<? extends Channel> opener, final Quorum quorum) throws InterruptedException { - + // Note: Compaction explicitly disabled for the WORM. super(writeCacheBufferCount, 0/* maxDirtyListSize */, readCacheBufferCount, false/* prefixWrites */, 100/* compactionThreshold */, hotCacheSize, hotCacheThreshold, useChecksums, extent, opener, quorum, WORMStrategy.this /*reader*/); Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestJournalShutdown.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -30,7 +30,6 @@ import java.lang.ref.WeakReference; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase2; @@ -202,13 +201,6 @@ try { try { -// final Object tst = new Object() { -// protected void finalize() throws Throwable { -// super.finalize(); -// System.err.println("Finalizer called!!"); -// } -// }; - for (int i = 0; i < limit; i++) { final Journal jnl = new Journal(properties) { @@ -221,6 +213,7 @@ + ncreated + ", nalive=" + nunfinalized); + // ensure that journals are destroyed when the are finalized. destroy(); } }; @@ -273,7 +266,7 @@ */ final AbstractTask task2 = new RegisterIndexTask( jnl.getConcurrencyManager(), "name", - new IndexMetadata("name", UUID.randomUUID()));; + new IndexMetadata("name", UUID.randomUUID())); /* * Submit one of the tasks and *wait* for its Future. @@ -282,7 +275,6 @@ jnl.getConcurrencyManager().submit(task1b).get(); jnl.getConcurrencyManager().submit(task2).get(); - } catch (/*Execution*/Exception e) { log.error("Problem registering index: " + e, e); } @@ -387,7 +379,7 @@ } } if (destroyed > 0) { - System.err.println("Destroyed " + destroyed + " non finalized journals"); + log.error("Destroyed " + destroyed + " non finalized journals"); } } Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestTx.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestTx.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/journal/TestTx.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -1256,7 +1256,7 @@ */ public void testStress() throws InterruptedException, ExecutionException { - final int ntx = 100; // 30; + final int ntx = 30; final int nops = 10000; final Journal store = getStore(); Modified: branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-01-23 18:17:20 UTC (rev 6819) +++ branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-01-23 18:59:30 UTC (rev 6820) @@ -163,8 +163,6 @@ // Start 3rd service. final HAGlue serverC = startC(); - awaitKBExists(serverC); - // Wait until the quorum is fully met. The token should not change. assertEquals(token, awaitFullyMetQuorum()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |