From: <mar...@us...> - 2010-08-02 09:33:24
|
Revision: 3384 http://bigdata.svn.sourceforge.net/bigdata/?rev=3384&view=rev Author: martyncutcher Date: 2010-08-02 09:33:17 +0000 (Mon, 02 Aug 2010) Log Message: ----------- Add support for breadcrumbing address useage - add/write/clear/remove - to track down rare errors. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-08-02 00:49:40 UTC (rev 3383) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-08-02 09:33:17 UTC (rev 3384) @@ -1634,6 +1634,9 @@ view.position(pos); final long offset = entry.getKey(); // offset in file to update + + registerWriteStatus(offset, md.recordLength, 'W'); + nwrites += FileChannelUtility.writeAll(opener, view, offset); // if (log.isInfoEnabled()) // log.info("writing to: " + offset); @@ -1757,6 +1760,10 @@ } } + protected void registerWriteStatus(long offset, int length, char action) { + // NOP to be overidden for debug if required + } + boolean m_written = false; private long lastOffset; @@ -1792,7 +1799,10 @@ * Using the conditional remove on ConcurrentMap guards against * this. */ - serviceRecordMap.remove(addr, this); + boolean removed = serviceRecordMap.remove(addr, this); + + registerWriteStatus(addr, 0, removed ? 'R' : 'L'); + } } else { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-08-02 00:49:40 UTC (rev 3383) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-08-02 09:33:17 UTC (rev 3384) @@ -313,8 +313,22 @@ * {@link #writeChk(long, ByteBuffer, int)}). */ final private WriteCache[] buffers; - + /** + * Debug arrays to chase down write/removal errors. + * + * Toggle comment appropriately to activate/deactivate + */ + final long[] addrsUsed = new long[4024 * 1024]; + int addrsUsedCurs = 0; + final char[] addrActions = new char[addrsUsed.length]; + final int[] addrLens = new int[addrsUsed.length]; +// final long[] addrsUsed = null; +// int addrsUsedCurs = 0; +// final char[] addrActions = null; +// final int[] addrLens = null; + + /** * The current file extent. */ final private AtomicLong fileExtent = new AtomicLong(-1L); @@ -1422,6 +1436,7 @@ final WriteCache cache = acquireForWriter(); try { + debugAddrs(offset, 0, 'A'); // write on the cache. if (cache.write(offset, data, chk, useChecksum)) { @@ -1598,6 +1613,19 @@ } + public void debugAddrs(long offset, int length, char c) { + if (addrsUsed != null) { + addrsUsed[addrsUsedCurs] = offset; + addrActions[addrsUsedCurs] = c; + addrLens[addrsUsedCurs] = length; + + addrsUsedCurs++; + if (addrsUsedCurs >= addrsUsed.length) { + addrsUsedCurs = 0; + } + } + } + /** * Write a record whose size (when combined with the optional checksum) is * larger than the capacity of an individual {@link WriteCache} buffer. This @@ -1917,6 +1945,7 @@ if (cache == null) return; acquireForWriter(); // in case current + debugAddrs(offset, 0, 'F'); try { cache.clearAddrMap(offset); } finally { @@ -1928,6 +1957,41 @@ } /** + * An array of writeCache actions is maintained that can be used + * to provide a breadcrumb of how that address has been written, saved, + * freed or removed. + * + * Write errors often show up as a checksum error, so the length of + * data written to the address cab be crucial information in determining the + * root of any problem. + * + * @param address for which info requested + * @return summary of writeCache actions + */ + public String addrDebugInfo(final long paddr) { + if (addrsUsed == null) { + return "No WriteCache debug info"; + } + + StringBuffer ret = new StringBuffer(); + // first see if address was ever written + boolean written = false; + for (int i = 0; i < addrsUsed.length; i++) { + if (i == addrsUsedCurs) { + ret.append("|...|"); + } + if (addrsUsed[i] == paddr) { + ret.append(addrActions[i]); + if (addrActions[i]=='W') { + ret.append("[" + addrLens[i] + "]"); + } + } + } + + return ret.toString(); + } + + /** * Performance counters for the {@link WriteCacheService}. * * @author <a href="mailto:tho...@us...">Bryan This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-09-03 11:14:23
|
Revision: 3501 http://bigdata.svn.sourceforge.net/bigdata/?rev=3501&view=rev Author: martyncutcher Date: 2010-09-03 11:14:17 +0000 (Fri, 03 Sep 2010) Log Message: ----------- Fix writeCache corruption on clearAddrMap Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-03 00:27:45 UTC (rev 3500) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-03 11:14:17 UTC (rev 3501) @@ -821,9 +821,9 @@ if (useChecksum) { - final int chk = tmp.getInt(pos + md.recordLength - 4); + final int chk = tmp.getInt(pos + reclen); - if (chk != ChecksumUtility.threadChk.get().checksum(b, 0/* offset */, b.length)) { + if (chk != ChecksumUtility.threadChk.get().checksum(b, 0/* offset */, reclen)) { // Note: [offset] is a (possibly relative) file offset. throw new ChecksumError("offset=" + offset); @@ -1713,6 +1713,9 @@ * write also and the buffer will be flushed either on commit or a * subsequent write. * + * A problem previously existed with unsynchronized access to the ByteBuffer. + * Resulting in a conflict over the position() and buffer corruption. + * * @param addr * The address of a cache entry. * @@ -1735,15 +1738,18 @@ final ByteBuffer tmp = acquire(); try { if (tmp.remaining() >= 12) { - int spos = tmp.position(); - tmp.putLong(addr); - tmp.putInt(0); - if (checker != null) { - // update the checksum (no side-effects on [data]) - ByteBuffer chkBuf = tmp.asReadOnlyBuffer(); - chkBuf.position(spos); - chkBuf.limit(tmp.position()); - checker.update(chkBuf); + // We must synchronize + synchronized (tmp) { + int spos = tmp.position(); + tmp.putLong(addr); + tmp.putInt(0); + if (checker != null) { + // update the checksum (no side-effects on [data]) + ByteBuffer chkBuf = tmp.asReadOnlyBuffer(); + chkBuf.position(spos); + chkBuf.limit(tmp.position()); + checker.update(chkBuf); + } } } } finally { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-09-03 00:27:45 UTC (rev 3500) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-09-03 11:14:17 UTC (rev 3501) @@ -1944,7 +1944,7 @@ final WriteCache cache = recordMap.remove(offset); if (cache == null) return; - acquireForWriter(); // in case current + final WriteCache cur = acquireForWriter(); // in case current debugAddrs(offset, 0, 'F'); try { cache.clearAddrMap(offset); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-09-23 16:09:19
|
Revision: 3616 http://bigdata.svn.sourceforge.net/bigdata/?rev=3616&view=rev Author: martyncutcher Date: 2010-09-23 16:09:13 +0000 (Thu, 23 Sep 2010) Log Message: ----------- Fix nwrites stats when using BuffereWrite Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-09-23 14:00:36 UTC (rev 3615) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-09-23 16:09:13 UTC (rev 3616) @@ -55,34 +55,38 @@ m_data = DirectBufferPool.INSTANCE.acquire(); } - public void write(final long offset, final ByteBuffer data, final IReopenChannel<FileChannel> opener) throws IOException { + public int write(final long offset, final ByteBuffer data, final IReopenChannel<FileChannel> opener) throws IOException { + int nwrites = 0; + m_dataWrites++; int data_len = data.remaining(); int slot_len = m_store.getSlotSize(data_len); if (slot_len > m_data.remaining()) { - flush(opener); + nwrites += flush(opener); } if (m_startAddr == -1) { m_startAddr = m_endAddr = offset; } else if (m_endAddr != offset) { // if this is NOT a contiguous write then flush existing content - flush(opener); + nwrites += flush(opener); m_startAddr = m_endAddr = offset; } m_data.put(data); m_endAddr += slot_len; long pos = m_endAddr - m_startAddr; m_data.position((int) pos); + + return nwrites; } - public void flush(final IReopenChannel<FileChannel> opener) throws IOException { + public int flush(final IReopenChannel<FileChannel> opener) throws IOException { m_dataBytes += m_data.position(); m_data.flip(); - FileChannelUtility.writeAll(opener, m_data, m_startAddr); + final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr); m_fileWrites++; m_data.position(0); @@ -90,6 +94,8 @@ m_startAddr = -1; m_endAddr = 0; + + return nwrites; } public String getStats(StringBuffer buf, boolean reset) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-23 14:00:36 UTC (rev 3615) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-23 16:09:13 UTC (rev 3616) @@ -1659,7 +1659,7 @@ if (m_bufferedWrite == null) { nwrites += FileChannelUtility.writeAll(opener, view, offset); } else { - m_bufferedWrite.write(offset, view, opener); + nwrites += m_bufferedWrite.write(offset, view, opener); } // if (log.isInfoEnabled()) // log.info("writing to: " + offset); @@ -1667,7 +1667,7 @@ } if (m_bufferedWrite != null) { - m_bufferedWrite.flush(opener); + nwrites += m_bufferedWrite.flush(opener); if (log.isTraceEnabled()) log.trace(m_bufferedWrite.getStats(null, true)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-14 16:50:27
|
Revision: 4006 http://bigdata.svn.sourceforge.net/bigdata/?rev=4006&view=rev Author: martyncutcher Date: 2010-12-14 16:50:21 +0000 (Tue, 14 Dec 2010) Log Message: ----------- add reset to BufferredWrite Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-12 22:22:27 UTC (rev 4005) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:50:21 UTC (rev 4006) @@ -93,12 +93,8 @@ final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr); m_fileWrites++; - m_data.position(0); - m_data.limit(m_data.capacity()); + reset(); - m_startAddr = -1; - m_endAddr = 0; - return nwrites; } @@ -115,4 +111,15 @@ return ret; } + + /** + * Caled by flush and also prior to use by the WriteCache. + */ + public void reset() { + m_data.position(0); + m_data.limit(m_data.capacity()); + + m_startAddr = -1; + m_endAddr = 0; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-12 22:22:27 UTC (rev 4005) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:50:21 UTC (rev 4006) @@ -1641,8 +1641,14 @@ /* * Retrieve the sorted write iterator and write each block to the - * file + * file. + * + * If there is a BufferedWrite then ensure it is reset. */ + if (m_bufferedWrite == null) { + m_bufferedWrite.reset(); + } + int nwrites = 0; final Iterator<Entry<Long, RecordMetadata>> entries = recordMap.entrySet().iterator(); while (entries.hasNext()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-14 16:56:49
|
Revision: 4007 http://bigdata.svn.sourceforge.net/bigdata/?rev=4007&view=rev Author: thompsonbry Date: 2010-12-14 16:56:43 +0000 (Tue, 14 Dec 2010) Log Message: ----------- Javadoc on BufferedWrite, added explicit synchronization, safe release of the direct ByteBuffer and a variety of other things for the paranoid. Modified WriteCache to invoke BufferedWrite#reset() when non-null. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:50:21 UTC (rev 4006) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:56:43 UTC (rev 4007) @@ -27,99 +27,242 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicReference; +import com.bigdata.counters.CAT; +import com.bigdata.counters.CounterSet; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.rwstore.RWStore; /** - * The BufferedWrite merges/elides sorted scattered writes to minimise - * IO requests and maximise IO rates. + * The BufferedWrite merges/elides sorted scattered writes to minimize IO + * requests and maximize IO rates. This has a net positive effect on SAS, SATA, + * and SSD. * * @author Martyn Cutcher - * + * + * @todo unit tests (this is used by RWStore and so is in general tested as part + * of that class, but it does not have its own test suite and it should not + * be all that difficult to write one, especially if we factor out an API + * for reporting the slotSize and then use a mock object in place of the + * RWStore). + * + * @todo harmonize with {@link CounterSet} for reporting purposes. */ public class BufferedWrite { - final RWStore m_store; - final ByteBuffer m_data; - long m_startAddr = -1; - long m_endAddr = 0; + + /** + * Used to determine the size of the allocation slot onto which a record is + * being written. This is used to pad the size of the IO out to the size of + * the slot. This can improve the IO efficiency When the slots are sized so + * as to fall on multiples of sector boundaries. + */ + private final RWStore m_store; + + /** + * The direct {@link ByteBuffer} used to combine writes which are contiguous + * into a single IO. + */ +// private final ByteBuffer m_data; + private final AtomicReference<ByteBuffer> m_data = new AtomicReference<ByteBuffer>(); + + /** + * The offset on the backing channel at which the data in {@link #m_data} + * will be written when it is flushed to the backing channel. This is + * <code>-1</code> initially (and when reset) as a flag indicating that + * there is no data in {@link #m_data} and that the next record written by + * the caller on the buffer will assign the {@link #m_startAddr starting + * offset} of the data in the buffer. + * <p> + * Guarded by synchronized(this) (paranoia) + */ + private long m_startAddr = -1; + + /** + * The offset of the backing channel at which the next byte would be written + * if it were appended to the data already present in {@link #m_data}. + * <p> + * Guarded by synchronized(this) (paranoia) + */ + private long m_endAddr = 0; - long m_dataBytes = 0; - long m_dataWrites = 0; - long m_fileWrites = 0; + /* + * Counters. + */ + private final CAT m_dataBytes = new CAT(); + private final CAT m_dataWrites = new CAT(); + private final CAT m_fileWrites = new CAT(); public BufferedWrite(final RWStore store) throws InterruptedException { + + if (store == null) + throw new IllegalArgumentException(); + m_store = store; - m_data = DirectBufferPool.INSTANCE.acquire(); + + m_data.set( DirectBufferPool.INSTANCE.acquire() ); + } - + + /** + * Release the direct buffer associated with this object. + * + * @throws InterruptedException + */ +// /* +// * Note: Consider adding synchronized(this) here to guard against the +// * possibility that the buffer could be released (and hence recycled) while +// * a write operation was occurring concurrently, However, this raises the +// * specter that a lock ordering problem could cause a deadlock. +// */ +// synchronized public void release() throws InterruptedException { - DirectBufferPool.INSTANCE.release(m_data); + + final ByteBuffer tmp = m_data.get(); + + if (tmp == null) { + + // Already closed. + return; + + } + + if (m_data.compareAndSet(tmp/* expected */, null/* update */)) { + + DirectBufferPool.INSTANCE.release(tmp); + + } + } - - public int write(final long offset, final ByteBuffer data, final IReopenChannel<FileChannel> opener) throws IOException { - int nwrites = 0; + + /** + * Buffer a write. + * + * @param offset + * The offset on the backing channel at which the data should be + * written. + * @param data + * The data. + * @param opener + * The object which knows how to re-open the backing channel. + * @return The #of write IOs performed during this method call. + * + * @throws IOException + */ + synchronized + public int write(final long offset, final ByteBuffer data, + final IReopenChannel<FileChannel> opener) throws IOException { - m_dataWrites++; + m_dataWrites.increment(); - int data_len = data.remaining(); - int slot_len = m_store.getSlotSize(data_len); + final int data_len = data.remaining(); + final int slot_len = m_store.getSlotSize(data_len); + int nwrites = 0; + final ByteBuffer m_data = this.m_data.get(); if (slot_len > m_data.remaining()) { + /* + * There is not enough room in [m_data] to absorb the caller's data + * record, so we have to flush first. + */ nwrites += flush(opener); } if (m_startAddr == -1) { + /* + * The buffer will begin to absorb data destined for the [offset] + * into the backing channel specified for the caller's data record. + */ m_startAddr = m_endAddr = offset; } else if (m_endAddr != offset) { - // if this is NOT a contiguous write then flush existing content + /* + * If this is NOT a contiguous write then flush existing content. + * After the flush, the buffer will begin to absorb data destined + * for the [offset] into the backing channel specified for the + * caller's data record. + */ nwrites += flush(opener); m_startAddr = m_endAddr = offset; } + // copy the caller's record into the buffer. m_data.put(data); + // update the file offset by the size of the allocation slot m_endAddr += slot_len; - long pos = m_endAddr - m_startAddr; + // update the buffer position by the size of the allocation slot. + final long pos = m_endAddr - m_startAddr; m_data.position((int) pos); return nwrites; } - public int flush(final IReopenChannel<FileChannel> opener) throws IOException { - m_dataBytes += m_data.position(); + /** + * Flush buffered data to the backing channel. + * + * @param opener + * The object which knows how to re-open the backing channel. + * + * @return The #of write IOs performed during this method call. + * + * @throws IOException + */ + synchronized + public int flush(final IReopenChannel<FileChannel> opener) + throws IOException { + + final ByteBuffer m_data = this.m_data.get(); + + if (m_data.remaining() == 0) { + // NOP. + return 0; + } + // increment by the amount of data currently in the buffer. + m_dataBytes.add( m_data.position() ); + + // write out the data in the buffer onto the backing channel. m_data.flip(); final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr); - m_fileWrites++; - + m_fileWrites.add(nwrites); + reset(); return nwrites; } + + /** + * Reset the buffer position and limit and clear the starting offset on the + * file to <code>-1</code>. + */ + synchronized + public void reset() { + + final ByteBuffer m_data = this.m_data.get(); + + // reset the buffer state. + m_data.position(0); + m_data.limit(m_data.capacity()); + + m_startAddr = -1; + m_endAddr = 0; + } - public String getStats(StringBuffer buf, boolean reset) { - String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; + public String getStats(final StringBuffer buf, final boolean reset) { + + final String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; if (buf != null) { buf.append(ret + "\n"); } if (reset) { - m_dataBytes = m_fileWrites = m_dataWrites = 0; + m_dataBytes.set(0L); + m_fileWrites.set(0L); + m_dataWrites.set(0L); } return ret; } - /** - * Caled by flush and also prior to use by the WriteCache. - */ - public void reset() { - m_data.position(0); - m_data.limit(m_data.capacity()); - - m_startAddr = -1; - m_endAddr = 0; - } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:50:21 UTC (rev 4006) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:56:43 UTC (rev 4007) @@ -1645,7 +1645,7 @@ * * If there is a BufferedWrite then ensure it is reset. */ - if (m_bufferedWrite == null) { + if (m_bufferedWrite != null) { m_bufferedWrite.reset(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |