From: <tho...@us...> - 2011-05-27 13:28:13
|
Revision: 4555 http://bigdata.svn.sourceforge.net/bigdata/?rev=4555&view=rev Author: thompsonbry Date: 2011-05-27 13:28:06 +0000 (Fri, 27 May 2011) Log Message: ----------- Removed support for the "asCommittedView" from AbstractTripleStore and the unit test for that support. This feature was not getting any use and had odd semantics. Added unit test for the DirectBufferPool to verify that it the acquire/release logic. Added assertion to the test suites to verify that acquired direct buffers are released. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestHelper.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreTransactionSemantics.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/DirectBufferPoolTestHelper.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2011-05-26 13:25:06 UTC (rev 4554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPool.java 2011-05-27 13:28:06 UTC (rev 4555) @@ -14,10 +14,11 @@ import org.apache.log4j.Logger; +import com.bigdata.counters.CAT; import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; import com.bigdata.counters.OneShotInstrument; -import com.bigdata.journal.DiskOnlyStrategy; +import com.bigdata.journal.IBufferStrategy; import com.bigdata.journal.TemporaryRawStore; import com.bigdata.journal.TransientBufferStrategy; import com.bigdata.rawstore.Bytes; @@ -36,7 +37,7 @@ * direct buffer for the operation which transfers the data from the * {@link TransientBufferStrategy} to disk. Therefore the data is copied into a * temporary buffer allocated from this pool and then the buffer is either - * handed off to the {@link DiskOnlyStrategy} for use as its write cache (in + * handed off to the {@link IBufferStrategy} for use as its write cache (in * which case the {@link TemporaryRawStore} holds a reference to the buffer and * releases it back to those pool when it is finalized) or the buffer is * immediately released back to this pool. @@ -65,8 +66,10 @@ * The name of the buffer pool. */ final private String name; - + /** + * A pool of direct {@link ByteBuffer}s which may be acquired. + * <p> * Note: This is NOT a weak reference collection since the JVM will leak * native memory. */ @@ -89,11 +92,22 @@ /** * The number {@link ByteBuffer}s allocated (must use {@link #lock} for - * updates or reads to be atomic). + * updates or reads to be atomic). This counter is incremented each time a + * buffer is allocated. Since we do not free buffers when they are released + * (to prevent an effective JVM memory leak) this counter is never + * decremented. */ private int size = 0; /** + * The #of {@link ByteBuffer}s which are currently acquired (must use + * {@link #lock} for updates or reads to be atomic). This counter is + * incremented when a buffer is acquired and decremented when a buffer + * is released. + */ + private int acquired = 0; + + /** * The maximum #of {@link ByteBuffer}s that will be allocated. */ private final int poolCapacity; @@ -114,11 +128,40 @@ private final Condition bufferRelease = lock.newCondition(); /** + * Package private counter of the total #of acquired buffers in all pools. + * This is used to check for memory leaks in the test suites. The value is + * reset before/after each test. + */ + static final CAT totalAcquireCount = new CAT(); + static final CAT totalReleaseCount = new CAT(); + + /** * The name of this buffer pool instance. */ public String getName() { return name; } + + /** + * The #of {@link ByteBuffer}s which are currently acquired. This counter is + * incremented when a buffer is acquired and decremented when a buffer is + * released. + */ + public int getAcquiredBufferCount() { + + lock.lock(); + + try { + + return acquired; + + } finally { + + lock.unlock(); + + } + + } /** * The capacity of the buffer as specified to the ctor. @@ -355,7 +398,7 @@ // The TimeoutException should not be thrown. throw new AssertionError(e); - + } } @@ -401,6 +444,9 @@ // the head of the pool must exist. final ByteBuffer b = pool.take(); + acquired++; + totalAcquireCount.increment(); + assertOurBuffer(b); // limit -> capacity; pos-> 0; mark cleared. @@ -454,6 +500,9 @@ if(!pool.offer(b, timeout, units)) return false; + acquired--; + totalReleaseCount.increment(); + /* * Signal ONE thread that there is a buffer available. * @@ -631,6 +680,8 @@ int bufferPoolCount = 0; // #of buffers currently allocated across all buffer pools. int bufferInUseCount = 0; + // #of buffers currently acquired across all buffer pools. + int totalAcquired = 0; // #of bytes currently allocated across all buffer pools. final AtomicLong totalBytesUsed = new AtomicLong(0L); // For each buffer pool. @@ -643,11 +694,14 @@ final int poolCapacity = p.getPoolCapacity(); final int bufferCapacity = p.getBufferCapacity(); + + final int acquired = p.getAcquiredBufferCount(); final long bytesUsed = poolSize * bufferCapacity; bufferPoolCount++; bufferInUseCount += poolSize; + totalAcquired += acquired; totalBytesUsed.addAndGet(bytesUsed); c.addCounter("poolCapacity", new OneShotInstrument<Integer>( @@ -656,6 +710,12 @@ c.addCounter("bufferCapacity", new OneShotInstrument<Integer>( bufferCapacity)); + c.addCounter("acquired", new Instrument<Integer>() { + public void sample() { + setValue(acquired); + } + }); + c.addCounter("poolSize", new Instrument<Integer>() { public void sample() { setValue(poolSize); @@ -676,7 +736,10 @@ /* * Totals. */ - + + tmp.addCounter("totalAcquired", new OneShotInstrument<Integer>( + totalAcquired)); + tmp.addCounter("bufferPoolCount", new OneShotInstrument<Integer>( bufferPoolCount)); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/DirectBufferPoolTestHelper.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/DirectBufferPoolTestHelper.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/DirectBufferPoolTestHelper.java 2011-05-27 13:28:06 UTC (rev 4555) @@ -0,0 +1,93 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on May 27, 2011 + */ + +package com.bigdata.io; + +import junit.extensions.proxy.IProxyTest; +import junit.framework.Assert; +import junit.framework.TestCase; + +/** + * Some helper methods for CI. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class DirectBufferPoolTestHelper { + + /** + * Verify that any buffers acquired by the test have been released. + * <p> + * Note: This clears the counter as a side effect to prevent a cascade + * of tests from being failed. + */ + public static void checkBufferPools(final TestCase test) { + + checkBufferPools(test, null/*delegate*/); + + } + + /** + * Verify that any buffers acquired by the test have been released (variant + * when using an {@link IProxyTest}). + * <p> + * Note: This clears the counter as a side effect to prevent a cascade of + * tests from being failed. + * + * @param test + * The unit test instance. + * @param testClass + * The instance of the delegate test class for a proxy test + * suite. For example, TestWORMStrategy. + */ + public static void checkBufferPools(final TestCase test, + final TestCase testClass) { + + final long nacquired = DirectBufferPool.totalAcquireCount.get(); + final long nreleased = DirectBufferPool.totalReleaseCount.get(); + DirectBufferPool.totalAcquireCount.set(0L); + DirectBufferPool.totalReleaseCount.set(0L); + + if (nacquired != nreleased) { + + /* + * At least one buffer was acquired which was never released. + */ + + Assert.fail("Test did not release buffer(s)"// + + ": nacquired=" + nacquired // + + ", nreleased=" + nreleased // + + ", test=" + test.getClass() + "." + test.getName()// + + (testClass == null ? "" : ", testClass=" + + testClass.getClass().getName())// + ); + + } + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/DirectBufferPoolTestHelper.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java 2011-05-26 13:25:06 UTC (rev 4554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPool.java 2011-05-27 13:28:06 UTC (rev 4555) @@ -28,6 +28,8 @@ package com.bigdata.io; +import java.nio.ByteBuffer; + import junit.framework.TestCase; /** @@ -51,11 +53,45 @@ super(arg0); } - /** @todo write tests. */ - public void test_nothing() { - -// fail("No tests written yet."); - + @Override + protected void setUp() throws Exception { + super.setUp(); + DirectBufferPoolTestHelper.checkBufferPools(this); } - + + @Override + protected void tearDown() throws Exception { + DirectBufferPoolTestHelper.checkBufferPools(this); + super.tearDown(); + } + + public void test_allocateRelease() throws InterruptedException { + + final int poolSizeBefore = DirectBufferPool.INSTANCE.getPoolSize(); + final int poolAcquiredBefore = DirectBufferPool.INSTANCE + .getAcquiredBufferCount(); + + final ByteBuffer b = DirectBufferPool.INSTANCE.acquire(); + + final int poolSizeDuring = DirectBufferPool.INSTANCE.getPoolSize(); + final int poolAcquiredDuring = DirectBufferPool.INSTANCE + .getAcquiredBufferCount(); + + assertEquals(poolSizeBefore + 1, poolSizeDuring); + assertEquals(poolAcquiredBefore + 1, poolAcquiredDuring); + + DirectBufferPool.INSTANCE.release(b); + + final int poolSizeAfter = DirectBufferPool.INSTANCE.getPoolSize(); + final int poolAcquiredAfter = DirectBufferPool.INSTANCE + .getAcquiredBufferCount(); + + // the pool size does not decrease. + assertEquals(poolSizeBefore + 1, poolSizeAfter); + + // the #of acquired buffers does decrease. + assertEquals(poolAcquiredBefore, poolAcquiredAfter); + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestHelper.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestHelper.java 2011-05-26 13:25:06 UTC (rev 4554) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/journal/TestHelper.java 2011-05-27 13:28:06 UTC (rev 4555) @@ -27,6 +27,8 @@ package com.bigdata.journal; +import com.bigdata.io.DirectBufferPoolTestHelper; + import junit.extensions.proxy.IProxyTest; import junit.framework.Assert; import junit.framework.TestCase; @@ -107,6 +109,10 @@ } + // Also check the direct buffer pools. + DirectBufferPoolTestHelper.checkBufferPools(test, testClass); + + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2011-05-26 13:25:06 UTC (rev 4554) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2011-05-27 13:28:06 UTC (rev 4555) @@ -1851,65 +1851,65 @@ if (isReadOnly()) throw new IllegalStateException(); - /* - * Clear the reference since it was as of the last commit point. - */ - readCommittedRef = null; +// /* +// * Clear the reference since it was as of the last commit point. +// */ +// readCommittedRef = null; return 0l; } - /** - * A factory returning a read-committed view of the database. - * <p> - * Note: There is a distinct instance <i>per commit time</i>. If an - * intervening commit has occurred, then you will get back a new instance - * providing a read-consistent view as of the now most recent commit point. - * - * FIXME The [per commit time] constraint is actually a function of the - * {@link ITx#READ_COMMITTED} semantics as implemented by the - * {@link IIndexManager}. If the indices are {@link IClientIndex}s then - * the instances remain valid since all requests are delegated through the - * {@link DataService} layer. However, if they are {@link BTree}s, then the - * instances are NOT valid. - * <p> - * Perhaps the best way to deal with this is to have a ReadCommittedBTree or - * to modify BTree to intrinsically understand read-committed semantics and - * to reload from the most recent checkpoint after each commit. That way the - * index references would always remain valid. - * <p> - * However, we have to be much more careful about read-consistent (choose a - * timestamp corresponding to the last commit point or the last closure - * point) vs read-committed (writes become immediately visible once they are - * committed). - */ - final public AbstractTripleStore asReadCommittedView() { - - if (getTimestamp() == ITx.READ_COMMITTED) { - - return this; - - } - - synchronized(this) { - - AbstractTripleStore view = readCommittedRef == null ? null - : readCommittedRef.get(); - - if(view == null) { - - view = (AbstractTripleStore) getIndexManager().getResourceLocator() - .locate(getNamespace(), ITx.READ_COMMITTED); - - readCommittedRef = new SoftReference<AbstractTripleStore>(view); - - } - - return view; - - } - - } - private SoftReference<AbstractTripleStore> readCommittedRef; +// /** +// * A factory returning a read-committed view of the database. +// * <p> +// * Note: There is a distinct instance <i>per commit time</i>. If an +// * intervening commit has occurred, then you will get back a new instance +// * providing a read-consistent view as of the now most recent commit point. +// * +// * FIXME The [per commit time] constraint is actually a function of the +// * {@link ITx#READ_COMMITTED} semantics as implemented by the +// * {@link IIndexManager}. If the indices are {@link IClientIndex}s then +// * the instances remain valid since all requests are delegated through the +// * {@link DataService} layer. However, if they are {@link BTree}s, then the +// * instances are NOT valid. +// * <p> +// * Perhaps the best way to deal with this is to have a ReadCommittedBTree or +// * to modify BTree to intrinsically understand read-committed semantics and +// * to reload from the most recent checkpoint after each commit. That way the +// * index references would always remain valid. +// * <p> +// * However, we have to be much more careful about read-consistent (choose a +// * timestamp corresponding to the last commit point or the last closure +// * point) vs read-committed (writes become immediately visible once they are +// * committed). +// */ +// final public AbstractTripleStore asReadCommittedView() { +// +// if (getTimestamp() == ITx.READ_COMMITTED) { +// +// return this; +// +// } +// +// synchronized(this) { +// +// AbstractTripleStore view = readCommittedRef == null ? null +// : readCommittedRef.get(); +// +// if(view == null) { +// +// view = (AbstractTripleStore) getIndexManager().getResourceLocator() +// .locate(getNamespace(), ITx.READ_COMMITTED); +// +// readCommittedRef = new SoftReference<AbstractTripleStore>(view); +// +// } +// +// return view; +// +// } +// +// } +// private SoftReference<AbstractTripleStore> readCommittedRef; final public long getJustificationCount() { Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreTransactionSemantics.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreTransactionSemantics.java 2011-05-26 13:25:06 UTC (rev 4554) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/TestLocalTripleStoreTransactionSemantics.java 2011-05-27 13:28:06 UTC (rev 4555) @@ -58,64 +58,64 @@ } } - /** - * Test the commit semantics in the context of a read-committed view of the - * database. - */ - public void test_commit() { - - final LocalTripleStore store = (LocalTripleStore) getStore(); - - try { - - // read-committed view of the same database. - final AbstractTripleStore view = store.asReadCommittedView(); - - final IV s = new TermId(VTE.URI, 1); - final IV p = new TermId(VTE.URI, 2); - final IV o = new TermId(VTE.URI, 3); - - // add the statement. - store.addStatements(new SPO[] { // - new SPO(s, p, o, StatementEnum.Explicit) // - },// - 1); - - final boolean stmtInStore = store.hasStatement(s, p, o); - - if(log.isInfoEnabled()) log.info("stmtInStore: " + stmtInStore); - - final boolean stmtInView = view.hasStatement(s, p, o); - - if(log.isInfoEnabled()) log.info("stmtInView: " + stmtInView); - - // visible in the repo. - assertTrue(stmtInStore); - - // not visible in the view. - assertFalse(stmtInView); - - // commit the transaction. - store.commit(); - - // now visible in the view - /* - * Note: this will fail if the Journal#getIndex(name,timestamp) does - * not return an index view with read-committed (vs read-consistent) - * semantics. For the index view to have read-committed semantics - * the view MUST update if there is an intervening commit. This is - * currently handled by returning a ReadCommittedView for this case - * rather than a BTree. - */ - assertTrue(view.hasStatement(s, p, o)); - - } finally { - - store.__tearDownUnitTest(); - - } - - } +// /** +// * Test the commit semantics in the context of a read-committed view of the +// * database. +// */ +// public void test_commit() { +// +// final LocalTripleStore store = (LocalTripleStore) getStore(); +// +// try { +// +// // read-committed view of the same database. +// final AbstractTripleStore view = store.asReadCommittedView(); +// +// final IV s = new TermId(VTE.URI, 1); +// final IV p = new TermId(VTE.URI, 2); +// final IV o = new TermId(VTE.URI, 3); +// +// // add the statement. +// store.addStatements(new SPO[] { // +// new SPO(s, p, o, StatementEnum.Explicit) // +// },// +// 1); +// +// final boolean stmtInStore = store.hasStatement(s, p, o); +// +// if(log.isInfoEnabled()) log.info("stmtInStore: " + stmtInStore); +// +// final boolean stmtInView = view.hasStatement(s, p, o); +// +// if(log.isInfoEnabled()) log.info("stmtInView: " + stmtInView); +// +// // visible in the repo. +// assertTrue(stmtInStore); +// +// // not visible in the view. +// assertFalse(stmtInView); +// +// // commit the transaction. +// store.commit(); +// +// // now visible in the view +// /* +// * Note: this will fail if the Journal#getIndex(name,timestamp) does +// * not return an index view with read-committed (vs read-consistent) +// * semantics. For the index view to have read-committed semantics +// * the view MUST update if there is an intervening commit. This is +// * currently handled by returning a ReadCommittedView for this case +// * rather than a BTree. +// */ +// assertTrue(view.hasStatement(s, p, o)); +// +// } finally { +// +// store.__tearDownUnitTest(); +// +// } +// +// } /** * Test of abort semantics. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |