From: <tho...@us...> - 2011-06-09 15:15:17
|
Revision: 4655 http://bigdata.svn.sourceforge.net/bigdata/?rev=4655&view=rev Author: thompsonbry Date: 2011-06-09 15:15:11 +0000 (Thu, 09 Jun 2011) Log Message: ----------- Passing changes back to martyn Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/IBufferAccess.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPoolAllocator.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2011-06-09 15:02:21 UTC (rev 4654) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2011-06-09 15:15:11 UTC (rev 4655) @@ -1,9 +1,7 @@ package com.bigdata.io; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -74,14 +72,18 @@ private class BufferState implements IBufferAccess { /** - * The buffer instance. + * The buffer instance. This is guarded by the monitor of the + * {@link BufferState} object. */ private ByteBuffer buf; BufferState(final ByteBuffer buf) { + if (buf == null) throw new IllegalArgumentException(); + this.buf = buf; + } /** @@ -123,34 +125,39 @@ // Implement IDirectBuffer methods public ByteBuffer buffer() { - return buf; + synchronized(this) { + return buf; + } } public void release() throws InterruptedException { release(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } - public void release(long timeout, TimeUnit units) throws InterruptedException { - if (buf != null) { - DirectBufferPool.this.release(buf, timeout, units); - buf = null; - } else { - throw new IllegalStateException("Buffer has already been released"); - } + public void release(long timeout, TimeUnit units) + throws InterruptedException { + + synchronized (this) { + if (buf == null) { + throw new IllegalStateException(); + } + DirectBufferPool.this.release(buf, timeout, units); + buf = null; + } + } protected void finalize() { if (buf != null) { try { + log.error("Buffer release on finalize"); DirectBufferPool.this.release(buf); } catch (InterruptedException e) { // ignore } - log.warn("Buffer released on finalize"); } } - } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/IBufferAccess.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/IBufferAccess.java 2011-06-09 15:02:21 UTC (rev 4654) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/IBufferAccess.java 2011-06-09 15:15:11 UTC (rev 4655) @@ -27,11 +27,45 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +/** + * Interface for access to and release of a direct {@link ByteBuffer} managed by + * the {@link DirectBufferPool}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ public interface IBufferAccess { - // return the byte buffer - public ByteBuffer buffer(); - - // release the ByteBuffer, returning to owning pool - public void release() throws InterruptedException; - public void release(long time, TimeUnit unit) throws InterruptedException; + + /** + * Return the direct {@link ByteBuffer}. + * <p> + * <strong>Caution:</strong> DO NOT hold onto a reference to the returned + * {@link ByteBuffer} without also retaining the {@link IBufferAccess} + * object. This can cause the backing {@link ByteBuffer} to be returned to + * the pool, after which it may be handed off to another thread leading to + * data corruption through concurrent modification to the backing bytes! + * + * @throws IllegalStateException + * if the buffer has been released. + */ + public ByteBuffer buffer(); + + /** + * Release the {@link ByteBuffer}, returning to owning pool. + * + * @throws IllegalStateException + * if the buffer has been released. + */ + public void release() throws InterruptedException; + + /** + * Release the {@link ByteBuffer}, returning to owning pool. + * + * @param time + * @param unit + * @throws IllegalStateException + * if the buffer has been released. + */ + public void release(long time, TimeUnit unit) throws InterruptedException; + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPoolAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPoolAllocator.java 2011-06-09 15:02:21 UTC (rev 4654) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPoolAllocator.java 2011-06-09 15:15:11 UTC (rev 4655) @@ -56,8 +56,18 @@ super(name); } - final DirectBufferPool pool = DirectBufferPool.INSTANCE; + private final DirectBufferPool pool = DirectBufferPool.INSTANCE; + @Override + protected void tearDown() throws Exception { + + // Verify that all allocated buffers were released. + DirectBufferPoolTestHelper.checkBufferPools(this); + + super.tearDown(); + + } + /** * Opens and closes the allocator. */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |