From: <mar...@us...> - 2010-09-17 10:19:33
|
Revision: 3576 http://bigdata.svn.sourceforge.net/bigdata/?rev=3576&view=rev Author: martyncutcher Date: 2010-09-17 10:19:27 +0000 (Fri, 17 Sep 2010) Log Message: ----------- Implement BufferedWrites and Allocation locality enhancements Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -0,0 +1,108 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.io.writecache; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import com.bigdata.io.DirectBufferPool; +import com.bigdata.io.FileChannelUtility; +import com.bigdata.io.IReopenChannel; +import com.bigdata.rwstore.RWStore; + +/** + * The BufferedWrite merges/elides sorted scattered writes to minimise + * IO requests and maximise IO rates. + * + * @author Martyn Cutcher + * + */ +public class BufferedWrite { + final RWStore m_store; + final ByteBuffer m_data; + long m_startAddr = -1; + long m_endAddr = 0; + + long m_dataBytes = 0; + long m_dataWrites = 0; + long m_fileWrites = 0; + + public BufferedWrite(final RWStore store) throws InterruptedException { + m_store = store; + m_data = DirectBufferPool.INSTANCE.acquire(); + } + + public void write(final long offset, final ByteBuffer data, final IReopenChannel<FileChannel> opener) throws IOException { + m_dataWrites++; + + int data_len = data.remaining(); + int slot_len = m_store.getSlotSize(data_len); + + if (slot_len > m_data.remaining()) { + flush(opener); + } + + if (m_startAddr == -1) { + m_startAddr = m_endAddr = offset; + } else if (m_endAddr != offset) { + // if this is NOT a contiguous write then flush existing content + flush(opener); + m_startAddr = m_endAddr = offset; + } + m_data.put(data); + m_endAddr += slot_len; + long pos = m_endAddr - m_startAddr; + m_data.position((int) pos); + } + + public void flush(final IReopenChannel<FileChannel> opener) throws IOException { + m_dataBytes += m_data.position(); + + m_data.flip(); + FileChannelUtility.writeAll(opener, m_data, m_startAddr); + m_fileWrites++; + + m_data.position(0); + m_data.limit(m_data.capacity()); + + m_startAddr = -1; + m_endAddr = 0; + } + + public String getStats(StringBuffer buf, boolean reset) { + String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; + + if (buf != null) { + buf.append(ret + "\n"); + } + + if (reset) { + m_dataBytes = m_fileWrites = m_dataWrites = 0; + } + + return ret; + } +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-16 21:09:42 UTC (rev 3575) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -1579,6 +1579,12 @@ * for the RW mode. Look into putting a thread pool to work on the scattered * writes. This could be part of a refactor to apply a thread pool to IOs * and related to prefetch and {@link Memoizer} behaviors. + * + * FIXME To maximize IO rates we should attempt to elide/merge contiguous + * writes. To do this can double-buffer in writeOnChannel. This also + * provides an opportunity to write the full slot size of the RWStore that + * may have advantages, particularly for an SSD, since it may avoid a + * pre-write read to populate the write sector. */ public static class FileChannelScatteredWriteCache extends WriteCache { @@ -1587,6 +1593,7 @@ */ private final IReopenChannel<FileChannel> opener; + private final BufferedWrite m_bufferedWrite; /** * @param baseOffset * An offset @@ -1596,7 +1603,8 @@ * @throws InterruptedException */ public FileChannelScatteredWriteCache(final ByteBuffer buf, final boolean useChecksum, - final boolean isHighlyAvailable, final boolean bufferHasData, final IReopenChannel<FileChannel> opener) + final boolean isHighlyAvailable, final boolean bufferHasData, final IReopenChannel<FileChannel> opener, + final BufferedWrite bufferedWrite) throws InterruptedException { super(buf, true/* scatteredWrites */, useChecksum, isHighlyAvailable, bufferHasData); @@ -1605,6 +1613,8 @@ throw new IllegalArgumentException(); this.opener = opener; + + m_bufferedWrite = bufferedWrite; } @@ -1646,13 +1656,23 @@ view.position(pos); final long offset = entry.getKey(); // offset in file to update - - nwrites += FileChannelUtility.writeAll(opener, view, offset); + if (m_bufferedWrite == null) { + nwrites += FileChannelUtility.writeAll(opener, view, offset); + } else { + m_bufferedWrite.write(offset, view, opener); + } // if (log.isInfoEnabled()) // log.info("writing to: " + offset); registerWriteStatus(offset, md.recordLength, 'W'); } + if (m_bufferedWrite != null) { + m_bufferedWrite.flush(opener); + + if (log.isTraceEnabled()) + log.trace(m_bufferedWrite.getStats(null, true)); + } + final WriteCacheCounters counters = this.counters.get(); counters.nwrite += nwrites; counters.bytesWritten += nbytes; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-16 21:09:42 UTC (rev 3575) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -117,6 +117,7 @@ if (hasFree()) { m_freeList.add(this); + m_freeWaiting = false; } } @@ -190,6 +191,7 @@ // added back if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { m_freeList.add(this); + m_freeWaiting = false; } m_freeTransients = 0; @@ -325,9 +327,7 @@ public String getStats(final AtomicLong counter) { - final StringBuilder sb = new StringBuilder("Block size : " + m_size - + " start : " + getStartAddr() + " free : " + m_freeBits - + "\r\n"); + final StringBuilder sb = new StringBuilder(getSummaryStats()); final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); while (iter.hasNext()) { @@ -336,12 +336,20 @@ break; } sb.append(block.getStats(null) + "\r\n"); - counter.addAndGet(block.getAllocBits() * m_size); + if (counter != null) + counter.addAndGet(block.getAllocBits() * m_size); } return sb.toString(); } + public String getSummaryStats() { + + return"Block size : " + m_size + + " start : " + getStartAddr() + " free : " + m_freeBits + + "\r\n"; + } + public boolean verify(int addr) { if (addr >= m_startAddr && addr < m_endAddr) { @@ -372,6 +380,8 @@ return false; } + private boolean m_freeWaiting = true; + public boolean free(final int addr, final int size) { if (addr < 0) { final int offset = ((-addr) & RWStore.OFFSET_BITS_MASK) - 3; // bit adjust @@ -382,8 +392,14 @@ if (((AllocBlock) m_allocBlocks.get(block)) .freeBit(offset % nbits)) { // bit adjust - if (m_freeBits++ == 0) { + + // Only add back to the free list if at least 3000 bits avail + if (m_freeBits++ == 0 && false) { + m_freeWaiting = false; m_freeList.add(this); + } else if (m_freeWaiting && m_freeBits == 3000) { + m_freeWaiting = false; + m_freeList.add(this); } } else { m_freeTransients++; @@ -445,6 +461,13 @@ if (log.isTraceEnabled()) log.trace("Remove from free list"); m_freeList.remove(this); + m_freeWaiting = true; + + // Should have been first on list, now check for first + if (m_freeList.size() > 0) { + FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); + System.out.println("Freelist head: " + nxt.getSummaryStats()); + } } addr += (count * 32 * m_bitSize); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-09-16 21:09:42 UTC (rev 3575) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -50,6 +50,7 @@ import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; +import com.bigdata.io.writecache.BufferedWrite; import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.CommitRecordIndex; @@ -327,6 +328,8 @@ private ReopenFileChannel m_reopener = null; + BufferedWrite m_bufferedWrite; + class WriteCacheImpl extends WriteCache.FileChannelScatteredWriteCache { public WriteCacheImpl(final ByteBuffer buf, final boolean useChecksum, @@ -335,7 +338,7 @@ throws InterruptedException { super(buf, useChecksum, m_quorum!=null&&m_quorum - .isHighlyAvailable(), bufferHasData, opener); + .isHighlyAvailable(), bufferHasData, opener, m_bufferedWrite); } @@ -379,6 +382,7 @@ * @param fileMetadataView * @param readOnly * @param quorum + * @throws InterruptedException */ public RWStore(final FileMetadataView fileMetadataView, final boolean readOnly, @@ -412,6 +416,12 @@ } catch (IOException e1) { throw new RuntimeException(e1); } + + try { + m_bufferedWrite = new BufferedWrite(this); + } catch (InterruptedException e1) { + m_bufferedWrite = null; + } int buffers = m_fmv.getFileMetadata().writeCacheBufferCount; log.warn("RWStore using writeCacheService with buffers: " + buffers); @@ -2122,6 +2132,7 @@ str.append("Allocation: " + stats[i].m_blockSize); str.append(", slots: " + stats[i].m_filledSlots + "/" + stats[i].m_reservedSlots); str.append(", storage: " + filled + "/" + reserved); + str.append(", usage: " + (filled * 100 / reserved) + "%"); str.append("\n"); } str.append("Total - file: " + convertAddr(m_fileSize) + ", slots: " + tfilledSlots + "/" + treservedSlots + ", storage: " + tfilled + "/" + treserved + "\n"); @@ -2942,4 +2953,17 @@ return ret; } + + + public int getSlotSize(int data_len) { + int i = 0; + + int ret = m_minFixedAlloc; + while (data_len > ret) { + i++; + ret = 64 * m_allocSizes[i]; + } + + return ret; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2010-09-16 21:09:42 UTC (rev 3575) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; import com.bigdata.io.IReopenChannel; +import com.bigdata.io.writecache.BufferedWrite; import com.bigdata.io.writecache.WriteCache; import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.io.writecache.WriteCache.FileChannelScatteredWriteCache; @@ -53,7 +54,6 @@ super(nbuffers, true/* useChecksum */, fileExtent, opener, quorum); - } /** @@ -72,7 +72,7 @@ return new FileChannelScatteredWriteCache(buf, true/* useChecksum */, highlyAvailable, bufferHasData, - (IReopenChannel<FileChannel>) opener); + (IReopenChannel<FileChannel>) opener, null); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2010-09-16 21:09:42 UTC (rev 3575) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -1780,7 +1780,7 @@ case RW: return new FileChannelScatteredWriteCache(buf, useChecksum, isHighlyAvailable, bufferHasData, - (IReopenChannel<FileChannel>) opener); + (IReopenChannel<FileChannel>) opener, null); default: throw new UnsupportedOperationException(); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java 2010-09-16 21:09:42 UTC (rev 3575) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java 2010-09-17 10:19:27 UTC (rev 3576) @@ -528,7 +528,7 @@ // ctor correct rejection tests: opener is null. try { new WriteCache.FileChannelScatteredWriteCache(buf, - useChecksum, isHighlyAvailable, bufferHasData, null/* opener */); + useChecksum, isHighlyAvailable, bufferHasData, null/* opener */, null); fail("Expected: " + IllegalArgumentException.class); } catch (IllegalArgumentException ex) { if (log.isInfoEnabled()) @@ -537,7 +537,7 @@ // allocate write cache using our buffer. final WriteCache writeCache = new WriteCache.FileChannelScatteredWriteCache( - buf, useChecksum, isHighlyAvailable, bufferHasData, opener); + buf, useChecksum, isHighlyAvailable, bufferHasData, opener, null); // verify the write cache self-reported capacity. assertEquals(DirectBufferPool.INSTANCE.getBufferCapacity() @@ -869,9 +869,9 @@ ByteBuffer data2 = getRandomData(20 * 1024); int chk2 = ChecksumUtility.threadChk.get().checksum(data2, 0/* offset */, data2.limit()); WriteCache cache1 = new WriteCache.FileChannelScatteredWriteCache(buf, true, true, - false, opener); + false, opener, null); WriteCache cache2 = new WriteCache.FileChannelScatteredWriteCache(buf2, true, true, - false, opener); + false, opener, null); // write first data buffer cache1.write(addr1, data1, chk1); @@ -976,7 +976,7 @@ // allocate write cache using our buffer. final WriteCache writeCache = new WriteCache.FileChannelScatteredWriteCache( - buf, useChecksum, isHighlyAvailable, bufferHasData, opener); + buf, useChecksum, isHighlyAvailable, bufferHasData, opener, null); /* * First write 500 records into the cache and confirm they can all be read okay This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |