From: <tho...@us...> - 2011-06-17 12:30:59
|
Revision: 4719 http://bigdata.svn.sourceforge.net/bigdata/?rev=4719&view=rev Author: thompsonbry Date: 2011-06-17 12:30:51 +0000 (Fri, 17 Jun 2011) Log Message: ----------- javadoc on WriteCache (removed mentions of the defunct latch). test harness modifications to TestMROWTransactions. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-17 11:45:06 UTC (rev 4718) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-17 12:30:51 UTC (rev 4719) @@ -129,11 +129,11 @@ * prevents {@link #acquire()} during critical sections such as * {@link #flush(boolean, long, TimeUnit)}, {@link #reset()}, and * {@link #close()}. - * <p> - * Note: To avoid lock ordering problems, acquire the read lock before you - * increment the latch and acquire the write lock before you await the - * latch. */ +// * <p> +// * Note: To avoid lock ordering problems, acquire the read lock before you +// * increment the latch and acquire the write lock before you await the +// * latch. final private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** @@ -1287,11 +1287,11 @@ * reuse it to receive more writes. * <p> * Note: Keep private unless strong need for override since you can not call - * this method without holding the write lock and having the {@link #latch} - * at zero. + * this method without holding the write lock * * @param tmp */ + // ... and having the {@link #latch} at zero. private void _resetState(final ByteBuffer tmp) { if (tmp == null) @@ -1805,7 +1805,7 @@ * with a full buffer where there is not room for the dummy "remove" prefix. * Whilst we could of course ensure that a buffer with less than the space * required for prefixWrites should be moved immediately to the dirtlyList, - * there would still exist the possibillity that the clear could be + * there would still exist the possibility that the clear could be * requested on a buffer already on the dirtyList. It looks like this should * not matter, since each buffer update can be considered as an atomic * update even if the set of writes are individually not atomic (the updates @@ -1826,7 +1826,7 @@ * @throws InterruptedException * @throws IllegalStateException */ - public void clearAddrMap(final long addr) throws IllegalStateException, InterruptedException { + /*public*/ void clearAddrMap(final long addr) throws IllegalStateException, InterruptedException { final RecordMetadata entry = recordMap.remove(addr); if (prefixWrites) { // final int pos = entry.bufferOffset - 12; @@ -1876,7 +1876,7 @@ } protected void registerWriteStatus(long offset, int length, char action) { - // NOP to be overidden for debug if required + // NOP to be overridden for debug if required } boolean m_written = false; Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java 2011-06-17 11:45:06 UTC (rev 4718) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java 2011-06-17 12:30:51 UTC (rev 4719) @@ -6,7 +6,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.openrdf.model.BNode; @@ -18,8 +17,6 @@ import org.openrdf.model.impl.ContextStatementImpl; import org.openrdf.model.impl.StatementImpl; import org.openrdf.model.impl.URIImpl; -import org.openrdf.repository.RepositoryConnection; -import org.openrdf.repository.RepositoryResult; import com.bigdata.counters.CAT; import com.bigdata.journal.BufferMode; @@ -31,6 +28,7 @@ import com.bigdata.rdf.store.BD; import com.bigdata.rdf.store.BigdataStatementIterator; import com.bigdata.rdf.vocab.NoVocabulary; +import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.DaemonThreadFactory; /** @@ -77,6 +75,14 @@ } + protected void setUp() throws Exception { + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + private URI uri(String s) { return new URIImpl(BD.NAMESPACE + s); } @@ -93,110 +99,110 @@ return new ContextStatementImpl(s, p, o, c); } - public void test_multiple_transaction() throws Exception { +// public void test_multiple_transaction() throws Exception { +// +// final int nthreads = 10; // +// final int nuris = 2000; // +// final int npreds = 50; // +// final Random r = new Random(); +// +// ExecutorService writers = Executors.newSingleThreadExecutor(DaemonThreadFactory.defaultThreadFactory()); +// ExecutorService readers = Executors.newFixedThreadPool(nthreads, DaemonThreadFactory.defaultThreadFactory()); +// +// final BigdataSail sail = getSail(); +// final URI[] subs = new URI[nuris]; +// for (int i = 0; i < nuris; i++) { +// subs[i] = uri("uri:" + i); +// } +// final URI[] preds = new URI[npreds]; +// for (int i = 0; i < npreds; i++) { +// preds[i] = uri("pred:" + i); +// } +// final AtomicInteger writes = new AtomicInteger(); +// final AtomicInteger reads = new AtomicInteger(); +// try { +// sail.initialize(); +// final BigdataSailRepository repo = new BigdataSailRepository(sail); +// +// // Writer task adds nwrites statements then commits +// class Writer implements Callable<Long> { +// final int nwrites; +// +// Writer(final int nwrites) { +// this.nwrites = nwrites; +// } +// +// public Long call() throws Exception { +// final RepositoryConnection tx1 = repo.getReadWriteConnection(); +// try { +// tx1.setAutoCommit(false); +// +// for (int i = 0; i < nwrites; i++) { +// tx1.add(stmt(subs[r.nextInt(500)], preds[r.nextInt(20)], subs[r.nextInt(500)])); +// writes.incrementAndGet(); +// } +// tx1.commit(); +// +// } finally { +// tx1.close(); +// } +// +// return null; +// } +// +// } +// +// // ReaderTask makes nreads and closes +// class Reader implements Callable<Long> { +// final int nreads; +// +// Reader(final int nwrites) { +// this.nreads = nwrites; +// } +// +// public Long call() throws Exception { +// final RepositoryConnection tx1 = repo.getReadOnlyConnection(); +// try { +// +// for (int i = 0; i < nreads; i++) { +// RepositoryResult<Statement> stats = tx1.getStatements(subs[r.nextInt(500)], null, null, true); +// while (stats.hasNext()) { +// stats.next(); +// reads.incrementAndGet(); +// } +// } +// +// } finally { +// tx1.close(); +// } +// +// return null; +// } +// +// } +// +// // let's schedule a few writers and readers +// for (int i = 0; i < 500; i++) { +// writers.submit(new Writer(500)); +// for (int rdrs = 0; rdrs < 20; rdrs++) { +// readers.submit(new Reader(50)); +// } +// } +// +// Thread.sleep(60 * 1000); +// writers.shutdownNow(); +// readers.shutdownNow(); +// writers.awaitTermination(5, TimeUnit.SECONDS); +// readers.awaitTermination(5, TimeUnit.SECONDS); +// System.out.println("Statements written: " + writes.get() + ", read: " + reads.get()); +// } finally { +// +// sail.__tearDownUnitTest(); +// +// } +// +// } - final int nthreads = 10; // - final int nuris = 2000; // - final int npreds = 50; // - final Random r = new Random(); - - ExecutorService writers = Executors.newSingleThreadExecutor(DaemonThreadFactory.defaultThreadFactory()); - ExecutorService readers = Executors.newFixedThreadPool(nthreads, DaemonThreadFactory.defaultThreadFactory()); - - final BigdataSail sail = getSail(); - final URI[] subs = new URI[nuris]; - for (int i = 0; i < nuris; i++) { - subs[i] = uri("uri:" + i); - } - final URI[] preds = new URI[npreds]; - for (int i = 0; i < npreds; i++) { - preds[i] = uri("pred:" + i); - } - final AtomicInteger writes = new AtomicInteger(); - final AtomicInteger reads = new AtomicInteger(); - try { - sail.initialize(); - final BigdataSailRepository repo = new BigdataSailRepository(sail); - - // Writer task adds nwrites statements then commits - class Writer implements Callable<Long> { - final int nwrites; - - Writer(final int nwrites) { - this.nwrites = nwrites; - } - - public Long call() throws Exception { - final RepositoryConnection tx1 = repo.getReadWriteConnection(); - try { - tx1.setAutoCommit(false); - - for (int i = 0; i < nwrites; i++) { - tx1.add(stmt(subs[r.nextInt(500)], preds[r.nextInt(20)], subs[r.nextInt(500)])); - writes.incrementAndGet(); - } - tx1.commit(); - - } finally { - tx1.close(); - } - - return null; - } - - } - - // ReaderTask makes nreads and closes - class Reader implements Callable<Long> { - final int nreads; - - Reader(final int nwrites) { - this.nreads = nwrites; - } - - public Long call() throws Exception { - final RepositoryConnection tx1 = repo.getReadOnlyConnection(); - try { - - for (int i = 0; i < nreads; i++) { - RepositoryResult<Statement> stats = tx1.getStatements(subs[r.nextInt(500)], null, null, true); - while (stats.hasNext()) { - stats.next(); - reads.incrementAndGet(); - } - } - - } finally { - tx1.close(); - } - - return null; - } - - } - - // let's schedule a few writers and readers - for (int i = 0; i < 500; i++) { - writers.submit(new Writer(500)); - for (int rdrs = 0; rdrs < 20; rdrs++) { - readers.submit(new Reader(50)); - } - } - - Thread.sleep(60 * 1000); - writers.shutdownNow(); - readers.shutdownNow(); - writers.awaitTermination(5, TimeUnit.SECONDS); - readers.awaitTermination(5, TimeUnit.SECONDS); - System.out.println("Statements written: " + writes.get() + ", read: " + reads.get()); - } finally { - - sail.__tearDownUnitTest(); - - } - - } - // similar to test_multiple_transactions but uses direct AbsractTripleStore // manipulations rather than RepositoryConnections public void test_multiple_csem_transaction() throws Exception { @@ -208,7 +214,7 @@ * session protection. If the protocol works correctly we should never * release session protection if any transaction has been initialized. * - * The mesage of "invalid address" would be generated if an allocation + * The message of "invalid address" would be generated if an allocation * has been freed and is no longer protected from recycling when an * attempt is made to read from it. */ @@ -218,29 +224,26 @@ final int npreds = 50; // final Random r = new Random(); - ExecutorService writers = Executors.newSingleThreadExecutor(DaemonThreadFactory.defaultThreadFactory()); - ExecutorService readers = Executors.newFixedThreadPool(nthreads, DaemonThreadFactory.defaultThreadFactory()); - - - final BigdataSail sail = getSail(); - sail.initialize(); - final BigdataSailRepository repo = new BigdataSailRepository(sail); - final AbstractTripleStore origStore = repo.getDatabase(); - - final URI[] subs = new URI[nuris]; - for (int i = 0; i < nuris; i++) { - subs[i] = uri("uri:" + i); - } - final URI[] preds = new URI[npreds]; - for (int i = 0; i < npreds; i++) { - preds[i] = uri("pred:" + i); - } final CAT writes = new CAT(); final CAT reads = new CAT(); - final AtomicReference<Exception> failex = new AtomicReference<Exception>(null); + final AtomicReference<Throwable> failex = new AtomicReference<Throwable>(null); + final BigdataSail sail = getSail(); try { - // Writer task adds nwrites statements then commits + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final AbstractTripleStore origStore = repo.getDatabase(); + + final URI[] subs = new URI[nuris]; + for (int i = 0; i < nuris; i++) { + subs[i] = uri("uri:" + i); + } + final URI[] preds = new URI[npreds]; + for (int i = 0; i < npreds; i++) { + preds[i] = uri("pred:" + i); + } + + // Writer task adds nwrites statements then commits class Writer implements Callable<Long> { final int nwrites; @@ -251,7 +254,7 @@ public Long call() throws Exception { try { final boolean isQuads = origStore.isQuads(); - // Thread.sleep(r.nextInt(2000) + 500); + Thread.sleep(r.nextInt(2000) + 500); try { for (int i = 0; i < nwrites; i++) { @@ -273,11 +276,19 @@ log.info("Commit"); } } - } catch (IllegalStateException ise) { - failex.compareAndSet(null, ise); - log.error(ise, ise); - } catch (Throwable t) { - log.error(t, t); + } catch (Throwable ise) { + if (!InnerCause.isInnerCause(ise, + InterruptedException.class)) { + if (failex + .compareAndSet(null/* expected */, ise/* newValue */)) { + log.error("firstCause:" + ise, ise); + } else { + if (log.isInfoEnabled()) + log.info("Other error: " + ise, ise); + } + } else { + // Ignore. + } } return null; } @@ -298,12 +309,12 @@ .getIndexManager()).newTx(ITx.READ_COMMITTED); try { - AbstractTripleStore readstore = (AbstractTripleStore) origStore + final AbstractTripleStore readstore = (AbstractTripleStore) origStore .getIndexManager().getResourceLocator() .locate(origStore.getNamespace(), txId); for (int i = 0; i < nreads; i++) { - BigdataStatementIterator stats = readstore + final BigdataStatementIterator stats = readstore .getStatements(subs[r.nextInt(nuris)], null, null); while (stats.hasNext()) { @@ -320,33 +331,50 @@ } catch (Throwable t) { log.error(t, t); } - return null; - } + return null; + } - } + } - // let's schedule a few writers and readers (more than needed) - for (int i = 0; i < 2000; i++) { - writers.submit(new Writer(500/*nwrite*/)); - for (int rdrs = 0; rdrs < 60; rdrs++) { - readers.submit(new Reader(20/*nread*/)); - } - } - - // let the writers run riot for a time - Thread.sleep(30 * 1000); - writers.shutdownNow(); - readers.shutdownNow(); - writers.awaitTermination(5, TimeUnit.SECONDS); - readers.awaitTermination(5, TimeUnit.SECONDS); - { - Exception ex = failex.get(); - if (ex != null) { - log.error(failex.get()); - fail("Test failed", ex); - } - } - System.out.println("Statements written: " + writes.get() + ", read: " + reads.get()); + ExecutorService writers = null; + ExecutorService readers = null; + try { + + writers = Executors.newSingleThreadExecutor(DaemonThreadFactory + .defaultThreadFactory()); + + readers = Executors.newFixedThreadPool(nthreads, + DaemonThreadFactory.defaultThreadFactory()); + + // let's schedule a few writers and readers (more than needed) + for (int i = 0; i < 3000; i++) { + writers.submit(new Writer(500/* nwrite */)); + for (int rdrs = 0; rdrs < 60; rdrs++) { + readers.submit(new Reader(20/* nread */)); + } + } + + // let the writers run riot for a time + Thread.sleep(60 * 1000); + writers.shutdownNow(); + readers.shutdownNow(); + writers.awaitTermination(5, TimeUnit.SECONDS); + readers.awaitTermination(5, TimeUnit.SECONDS); + { + final Throwable ex = failex.get(); + if (ex != null) { + fail("Test failed: firstCause=" + ex, ex); + } + } + if (log.isInfoEnabled()) + log.info("Statements written: " + writes.get() + ", read: " + + reads.get()); + } finally { + if (writers != null) + writers.shutdownNow(); + if (readers != null) + readers.shutdownNow(); + } } finally { sail.__tearDownUnitTest(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |