From: <tho...@us...> - 2011-05-27 14:40:29
|
Revision: 4557 http://bigdata.svn.sourceforge.net/bigdata/?rev=4557&view=rev Author: thompsonbry Date: 2011-05-27 14:40:22 +0000 (Fri, 27 May 2011) Log Message: ----------- Modified the DirectBufferPool to explicitly track the state of the allocated direct ByteBuffers in order to perform a fast correct rejection when a buffer is double released or when an attempt is made to release a buffer to a different pool. Added more unit tests for the DirectBufferPool. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2011-05-27 13:45:58 UTC (rev 4556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2011-05-27 14:40:22 UTC (rev 4557) @@ -2,6 +2,7 @@ import java.nio.ByteBuffer; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -63,6 +64,77 @@ .getLogger(DirectBufferPool.class); /** + * Object tracking state for allocated buffer instances. This is used to + * reject double-release of a buffer back to the pool, which is critical. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class BufferState { + + /** + * The buffer instance. + */ + private final ByteBuffer buf; + + /** + * <code>true</code> iff the buffer is currently acquired. + */ + private boolean acquired; + +// /** +// * The #of times this buffer has been acquired. +// */ +// private long nacquired = 0L; + + BufferState(final ByteBuffer buf, final boolean acquired) { + if (buf == null) + throw new IllegalArgumentException(); + this.buf = buf; + this.acquired = acquired; + } + + /** + * The hash code depends only on the object id (NOT the buffer's data). + * <p> + * Note: {@link ByteBuffer#hashCode()} is a very heavy operator whose + * result depends on the data actually in the buffer at the time the + * operation is evaluated! + */ + public int hashCode() { + return super.hashCode(); + } + + /** + * Equality depends only on a reference checks. + * <p> + * Note: {@link ByteBuffer#equals(Object)} is very heavy operator whose + * result depends on the data actually in the buffer at the time the + * operation is evaluated! + */ + public boolean equals(Object o) { + if (this == o) { + // Same BufferState, must be the same buffer. + return true; + } + if (!(o instanceof BufferState)) { + return false; + } + if (this.buf == ((BufferState) o).buf) { + return true; + } + /* + * We have two distinct BufferState references for the same + * ByteBuffer reference. This is an error. There should be a + * one-to-one correspondence. + */ + throw new AssertionError(); + } + + + } + + /** * The name of the buffer pool. */ final private String name; @@ -73,22 +145,18 @@ * Note: This is NOT a weak reference collection since the JVM will leak * native memory. */ - final private BlockingQueue<ByteBuffer> pool; + final private BlockingQueue<BufferState> pool; /** - * Used to recognize {@link ByteBuffer}s allocated by this pool so that - * we can refuse offered buffers that were allocated elsewhere (a - * paranoia feature which could be dropped). + * Used to recognize {@link ByteBuffer}s allocated by this pool so that we + * can refuse offered buffers that were allocated elsewhere (a paranoia + * feature which could be dropped). * <p> - * Note: YOU CAN NOT use a hash-based collection here. hashCode() and - * equals() for a {@link ByteBuffer} are very heavy operations that are - * dependent on the data actually in the buffer at the time the - * operation is evaluated! - * <p> - * Note: if you set [allocated := null] in the ctor then tests of the - * allocated list are disabled. + * Note: {@link LinkedHashSet} is used here for its fast iterator semantics + * since we need to do a linear scan of this collection in + * {@link #getBufferState(ByteBuffer)}. */ - final private List<ByteBuffer> allocated; + final private LinkedHashSet<BufferState> allocated; /** * The number {@link ByteBuffer}s allocated (must use {@link #lock} for @@ -358,10 +426,10 @@ this.bufferCapacity = bufferCapacity; - this.allocated = null; // Note: disables assertion - // this.allocated = new LinkedList<ByteBuffer>(); + // Note: This is required in order to detect double-opens. + this.allocated = new LinkedHashSet<BufferState>(); - this.pool = new LinkedBlockingQueue<ByteBuffer>(poolCapacity); + this.pool = new LinkedBlockingQueue<BufferState>(poolCapacity); pools.add(this); @@ -442,17 +510,19 @@ } // the head of the pool must exist. - final ByteBuffer b = pool.take(); + final BufferState state = pool.take(); + if (state.acquired) + throw new RuntimeException("Buffer already acquired"); + + state.acquired = true; acquired++; totalAcquireCount.increment(); - - assertOurBuffer(b); // limit -> capacity; pos-> 0; mark cleared. - b.clear(); + state.buf.clear(); - return b; + return state.buf; } finally { @@ -515,15 +585,19 @@ try { - assertOurBuffer(b); + final BufferState state = getBufferState(b); - if (pool.contains(b)) + // Check for double-release! + if (!state.acquired) { + log.error("Buffer already released."); throw new IllegalArgumentException("buffer already released."); - + } + // add to the pool. - if(!pool.offer(b, timeout, units)) + if(!pool.offer(state, timeout, units)) return false; + state.acquired = false; acquired--; totalReleaseCount.increment(); @@ -591,15 +665,14 @@ // update the pool size. size++; + // wrap with state metadata. + final BufferState state = new BufferState(b, false/* acquired */); + // add to the set of known buffers - if (allocated != null) { + allocated.add(state); - allocated.add(b); - - } - // add to the pool. - pool.add(b); + pool.add(state); /* * There is now a buffer in the pool and the caller will get it @@ -658,8 +731,9 @@ * {@link DirectBufferPool}. * * @param b + * The buffer. */ - private void assertOurBuffer(final ByteBuffer b) { + private BufferState getBufferState(final ByteBuffer b) { assert lock.isHeldByCurrentThread(); @@ -672,21 +746,20 @@ if(!b.isDirect()) throw new IllegalArgumentException("not direct"); - if (allocated == null) { + /* + * Linear scan for a BufferState object having that ByteBuffer + * reference. + */ + for (BufferState x : allocated) { - // test is disabled. + if (x.buf == b) { + + return x; + + } - return; - } - for (ByteBuffer x : allocated) { - - if (x == b) - return; - - } - throw new IllegalArgumentException("Buffer not allocated by this pool."); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java 2011-05-27 13:45:58 UTC (rev 4556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java 2011-05-27 14:40:22 UTC (rev 4557) @@ -37,17 +37,6 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * TODO Unit test to verify that a pool will reject a buffer not - * acquired from that pool. - * - * TODO Write a unit test to verify that buffers (up to the size of the - * pool, but not more than 10) are recycled when they are released - * (that is, they are made available again to subsequent acquires). - * - * <pre> - * final int limit = Math.min(10, DirectBufferPool.INSTANCE.getPoolCapacity()); - * </pre> */ public class TestDirectBufferPool extends TestCase2 { @@ -65,15 +54,13 @@ } @Override - protected void setUp() throws Exception { - super.setUp(); - DirectBufferPoolTestHelper.checkBufferPools(this); - } + protected void tearDown() throws Exception { - @Override - protected void tearDown() throws Exception { + // Verify that all allocated buffers were released. DirectBufferPoolTestHelper.checkBufferPools(this); + super.tearDown(); + } public void test_allocateRelease() throws InterruptedException { @@ -122,7 +109,24 @@ */ public void test_buffersRecycled() throws InterruptedException { + /* + * Acquire/release one buffer before we look at the pool size. This + * should give us at least one available buffer in the pool. That way + * when we run through the allocation loop the pool size should not + * change. + */ + { + ByteBuffer b = null; + try { + b = DirectBufferPool.INSTANCE.acquire(); + } finally { + if (b != null) + DirectBufferPool.INSTANCE.release(b); + } + } + final int poolSizeBefore = DirectBufferPool.INSTANCE.getPoolSize(); + for (int i = 0; i < 10; i++) { ByteBuffer b = null; try { @@ -136,6 +140,9 @@ } } + // pool size remains constant. + assertEquals(poolSizeBefore, DirectBufferPool.INSTANCE.getPoolSize()); + } /** @@ -168,4 +175,33 @@ } + /** + * Unit test to verify that a pool will reject a buffer not acquired from + * that pool. + */ + public void test_rejectBufferFromAnotherPool() throws InterruptedException { + + // A distinct pool with the same buffer capacity + final DirectBufferPool testPool = new DirectBufferPool("test", + 1/* poolCapacity */, DirectBufferPool.INSTANCE + .getBufferCapacity()); + + ByteBuffer b = null; + try { + b = DirectBufferPool.INSTANCE.acquire(); + try { + testPool.release(b); + fail("Release should not be permitted to a different pool. Expecting: " + + IllegalArgumentException.class); + } catch (IllegalArgumentException ex) { + if (log.isInfoEnabled()) + log.info("Ignoring expected exception: " + ex); + } + } finally { + if (b != null) + DirectBufferPool.INSTANCE.release(b); + } + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |