From: <mar...@us...> - 2011-05-27 15:04:09
|
Revision: 4558 http://bigdata.svn.sourceforge.net/bigdata/?rev=4558&view=rev Author: martyncutcher Date: 2011-05-27 15:04:03 +0000 (Fri, 27 May 2011) Log Message: ----------- Add checkdata method to WriteCache to confirm state of buffer on checksum failure and simple test Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-05-27 14:40:22 UTC (rev 4557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-05-27 15:04:03 UTC (rev 4558) @@ -838,7 +838,7 @@ if (chk != ChecksumUtility.threadChk.get().checksum(b, 0/* offset */, reclen)) { // Note: [offset] is a (possibly relative) file offset. - throw new ChecksumError("offset=" + offset); + throw new ChecksumError(checkdata()); } @@ -1077,6 +1077,59 @@ } /** + * Locks + * @return + * @throws InterruptedException + * @throws IllegalStateException + */ + public String checkdata() throws IllegalStateException, InterruptedException { + + if (!useChecksum) { + return "Unable to check since checksums are not enabled"; + } + + ByteBuffer tmp = acquire(); + try { + int nerrors = 0; + int nrecords = recordMap.size(); + + for (Entry<Long, RecordMetadata> ent : recordMap.entrySet()) { + RecordMetadata md = ent.getValue(); + + // length of the record w/o checksum field. + final int reclen = md.recordLength - 4; + + // the start of the record in writeCache. + final int pos = md.bufferOffset; + + final int chk = tmp.getInt(pos + reclen); + + // create a view with same offset, limit and position. + final ByteBuffer view = tmp.duplicate(); + + // adjust the view to just the record of interest. + view.limit(pos + reclen); + view.position(pos); + + final byte[] b = new byte[reclen]; + + final ByteBuffer dst = ByteBuffer.wrap(b); + + // copy the data into [dst] (and the backing byte[]). + dst.put(view); + if (chk != ChecksumUtility.threadChk.get().checksum(b, 0/* offset */, reclen)) { + log.error("Bad data for address: " + ent.getKey()); + nerrors++; + } + + } + return "WriteCache checkdata - records: " + nrecords + ", errors: " + nerrors; + } finally { + release(); + } + } + + /** * Write the data from the buffer onto the channel. This method provides a * uniform means to request that the buffer write itself onto the backing * channel, regardless of whether the channel is backed by a file, a socket, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java 2011-05-27 14:40:22 UTC (rev 4557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/writecache/TestWriteCache.java 2011-05-27 15:04:03 UTC (rev 4558) @@ -44,6 +44,7 @@ import com.bigdata.io.TestCase3; import com.bigdata.io.writecache.WriteCache; import com.bigdata.rawstore.Bytes; +import com.bigdata.util.ChecksumError; import com.bigdata.util.ChecksumUtility; /** @@ -85,7 +86,62 @@ * contents and metadata to the disk as necessary. */ final private static String mode = "rw"; + + /** + * Confirm checksum errors + */ + public void test_writeCacheChecksums() { + + try { + final File file = File.createTempFile(getName(), ".tmp"); + + final boolean isHighlyAvailable = false; + + final ReopenFileChannel opener = new ReopenFileChannel(file, mode); + + final ByteBuffer buf = DirectBufferPool.INSTANCE.acquire(); + + try { + + // The buffer size must be at least 1k for these tests. + assertTrue(DirectBufferPool.INSTANCE.getBufferCapacity() >= Bytes.kilobyte32); + + WriteCache writeCache = new WriteCache.FileChannelWriteCache(0, buf, + true, isHighlyAvailable, false, opener); + + + long addr1 = 0; + long addr2 = 12800; + long addr3 = 24800; + ByteBuffer data1 = getRandomData(512); + int chk1 = ChecksumUtility.threadChk.get().checksum(data1, 0/* offset */, data1.limit()); + + writeCache.write(addr1, data1, chk1); + data1.flip(); + writeCache.write(addr2, data1, 23); // bad checksum + data1.flip(); + writeCache.write(addr3, data1, chk1); // bad checksum + + writeCache.read(addr1); + writeCache.read(addr3); + + try { + writeCache.read(addr2); + + fail("Expected ChecksumError"); + } catch (ChecksumError ce) { + System.out.println("Expected: " + ce.getMessage()); + } + + } finally { + DirectBufferPool.INSTANCE.release(buf); + } + } catch (Exception e) { + fail("Unexpected exception", e); + } + } + /** * Exercises most of the API. * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |