From: <tho...@us...> - 2010-12-14 16:56:49
|
Revision: 4007 http://bigdata.svn.sourceforge.net/bigdata/?rev=4007&view=rev Author: thompsonbry Date: 2010-12-14 16:56:43 +0000 (Tue, 14 Dec 2010) Log Message: ----------- Javadoc on BufferedWrite, added explicit synchronization, safe release of the direct ByteBuffer and a variety of other things for the paranoid. Modified WriteCache to invoke BufferedWrite#reset() when non-null. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:50:21 UTC (rev 4006) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:56:43 UTC (rev 4007) @@ -27,99 +27,242 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicReference; +import com.bigdata.counters.CAT; +import com.bigdata.counters.CounterSet; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.rwstore.RWStore; /** - * The BufferedWrite merges/elides sorted scattered writes to minimise - * IO requests and maximise IO rates. + * The BufferedWrite merges/elides sorted scattered writes to minimize IO + * requests and maximize IO rates. This has a net positive effect on SAS, SATA, + * and SSD. * * @author Martyn Cutcher - * + * + * @todo unit tests (this is used by RWStore and so is in general tested as part + * of that class, but it does not have its own test suite and it should not + * be all that difficult to write one, especially if we factor out an API + * for reporting the slotSize and then use a mock object in place of the + * RWStore). + * + * @todo harmonize with {@link CounterSet} for reporting purposes. */ public class BufferedWrite { - final RWStore m_store; - final ByteBuffer m_data; - long m_startAddr = -1; - long m_endAddr = 0; + + /** + * Used to determine the size of the allocation slot onto which a record is + * being written. This is used to pad the size of the IO out to the size of + * the slot. This can improve the IO efficiency When the slots are sized so + * as to fall on multiples of sector boundaries. + */ + private final RWStore m_store; + + /** + * The direct {@link ByteBuffer} used to combine writes which are contiguous + * into a single IO. + */ +// private final ByteBuffer m_data; + private final AtomicReference<ByteBuffer> m_data = new AtomicReference<ByteBuffer>(); + + /** + * The offset on the backing channel at which the data in {@link #m_data} + * will be written when it is flushed to the backing channel. This is + * <code>-1</code> initially (and when reset) as a flag indicating that + * there is no data in {@link #m_data} and that the next record written by + * the caller on the buffer will assign the {@link #m_startAddr starting + * offset} of the data in the buffer. + * <p> + * Guarded by synchronized(this) (paranoia) + */ + private long m_startAddr = -1; + + /** + * The offset of the backing channel at which the next byte would be written + * if it were appended to the data already present in {@link #m_data}. + * <p> + * Guarded by synchronized(this) (paranoia) + */ + private long m_endAddr = 0; - long m_dataBytes = 0; - long m_dataWrites = 0; - long m_fileWrites = 0; + /* + * Counters. + */ + private final CAT m_dataBytes = new CAT(); + private final CAT m_dataWrites = new CAT(); + private final CAT m_fileWrites = new CAT(); public BufferedWrite(final RWStore store) throws InterruptedException { + + if (store == null) + throw new IllegalArgumentException(); + m_store = store; - m_data = DirectBufferPool.INSTANCE.acquire(); + + m_data.set( DirectBufferPool.INSTANCE.acquire() ); + } - + + /** + * Release the direct buffer associated with this object. + * + * @throws InterruptedException + */ +// /* +// * Note: Consider adding synchronized(this) here to guard against the +// * possibility that the buffer could be released (and hence recycled) while +// * a write operation was occurring concurrently, However, this raises the +// * specter that a lock ordering problem could cause a deadlock. +// */ +// synchronized public void release() throws InterruptedException { - DirectBufferPool.INSTANCE.release(m_data); + + final ByteBuffer tmp = m_data.get(); + + if (tmp == null) { + + // Already closed. + return; + + } + + if (m_data.compareAndSet(tmp/* expected */, null/* update */)) { + + DirectBufferPool.INSTANCE.release(tmp); + + } + } - - public int write(final long offset, final ByteBuffer data, final IReopenChannel<FileChannel> opener) throws IOException { - int nwrites = 0; + + /** + * Buffer a write. + * + * @param offset + * The offset on the backing channel at which the data should be + * written. + * @param data + * The data. + * @param opener + * The object which knows how to re-open the backing channel. + * @return The #of write IOs performed during this method call. + * + * @throws IOException + */ + synchronized + public int write(final long offset, final ByteBuffer data, + final IReopenChannel<FileChannel> opener) throws IOException { - m_dataWrites++; + m_dataWrites.increment(); - int data_len = data.remaining(); - int slot_len = m_store.getSlotSize(data_len); + final int data_len = data.remaining(); + final int slot_len = m_store.getSlotSize(data_len); + int nwrites = 0; + final ByteBuffer m_data = this.m_data.get(); if (slot_len > m_data.remaining()) { + /* + * There is not enough room in [m_data] to absorb the caller's data + * record, so we have to flush first. + */ nwrites += flush(opener); } if (m_startAddr == -1) { + /* + * The buffer will begin to absorb data destined for the [offset] + * into the backing channel specified for the caller's data record. + */ m_startAddr = m_endAddr = offset; } else if (m_endAddr != offset) { - // if this is NOT a contiguous write then flush existing content + /* + * If this is NOT a contiguous write then flush existing content. + * After the flush, the buffer will begin to absorb data destined + * for the [offset] into the backing channel specified for the + * caller's data record. + */ nwrites += flush(opener); m_startAddr = m_endAddr = offset; } + // copy the caller's record into the buffer. m_data.put(data); + // update the file offset by the size of the allocation slot m_endAddr += slot_len; - long pos = m_endAddr - m_startAddr; + // update the buffer position by the size of the allocation slot. + final long pos = m_endAddr - m_startAddr; m_data.position((int) pos); return nwrites; } - public int flush(final IReopenChannel<FileChannel> opener) throws IOException { - m_dataBytes += m_data.position(); + /** + * Flush buffered data to the backing channel. + * + * @param opener + * The object which knows how to re-open the backing channel. + * + * @return The #of write IOs performed during this method call. + * + * @throws IOException + */ + synchronized + public int flush(final IReopenChannel<FileChannel> opener) + throws IOException { + + final ByteBuffer m_data = this.m_data.get(); + + if (m_data.remaining() == 0) { + // NOP. + return 0; + } + // increment by the amount of data currently in the buffer. + m_dataBytes.add( m_data.position() ); + + // write out the data in the buffer onto the backing channel. m_data.flip(); final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr); - m_fileWrites++; - + m_fileWrites.add(nwrites); + reset(); return nwrites; } + + /** + * Reset the buffer position and limit and clear the starting offset on the + * file to <code>-1</code>. + */ + synchronized + public void reset() { + + final ByteBuffer m_data = this.m_data.get(); + + // reset the buffer state. + m_data.position(0); + m_data.limit(m_data.capacity()); + + m_startAddr = -1; + m_endAddr = 0; + } - public String getStats(StringBuffer buf, boolean reset) { - String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; + public String getStats(final StringBuffer buf, final boolean reset) { + + final String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; if (buf != null) { buf.append(ret + "\n"); } if (reset) { - m_dataBytes = m_fileWrites = m_dataWrites = 0; + m_dataBytes.set(0L); + m_fileWrites.set(0L); + m_dataWrites.set(0L); } return ret; } - /** - * Caled by flush and also prior to use by the WriteCache. - */ - public void reset() { - m_data.position(0); - m_data.limit(m_data.capacity()); - - m_startAddr = -1; - m_endAddr = 0; - } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:50:21 UTC (rev 4006) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:56:43 UTC (rev 4007) @@ -1645,7 +1645,7 @@ * * If there is a BufferedWrite then ensure it is reset. */ - if (m_bufferedWrite == null) { + if (m_bufferedWrite != null) { m_bufferedWrite.reset(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |