From: <tho...@us...> - 2010-09-03 12:32:28
|
Revision: 3504 http://bigdata.svn.sourceforge.net/bigdata/?rev=3504&view=rev Author: thompsonbry Date: 2010-09-03 12:32:22 +0000 (Fri, 03 Sep 2010) Log Message: ----------- Updated the javadoc on the WriteCache to clarify the conditions which require additional synchronization to protect the ByteBuffer's position() from concurrency related side effects. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-03 11:48:08 UTC (rev 3503) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-03 12:32:22 UTC (rev 3504) @@ -134,20 +134,28 @@ */ final private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - /** - * Return the backing {@link ByteBuffer}. The caller may read or write on - * the buffer. Once they are done, the caller MUST call {@link #release()}. - * This uses the read lock to allow concurrent read/write operations on the - * backing buffer. Note that at most one write operation may execute - * concurrently in order to avoid side effects on the buffers position when - * copying data onto the buffer. - * - * @return The {@link ByteBuffer}. - * - * @throws InterruptedException - * @throws IllegalStateException - * if the {@link WriteCache} is closed. - */ + /** + * Return the backing {@link ByteBuffer}. The caller may read or write on + * the buffer, but MUST NOT have a side effect on the + * {@link ByteBuffer#position()} without first synchronizing on the + * {@link ByteBuffer}. Once they are done, the caller MUST call + * {@link #release()}. + * <p> + * Note: This uses the read lock to allow concurrent read/write operations + * on the backing buffer. + * <p> + * Note: <strong>At most one write operation may execute concurrently in + * order to avoid side effects on the buffers position when copying data + * onto the buffer. This constraint must be imposed by the caller using a + * <code>synchronized(buf){}</code> block during the critical sections where + * the buffer position will be updated by a write. </strong> + * + * @return The {@link ByteBuffer}. + * + * @throws InterruptedException + * @throws IllegalStateException + * if the {@link WriteCache} is closed. + */ private ByteBuffer acquire() throws InterruptedException, IllegalStateException { final Lock readLock = lock.readLock(); @@ -668,12 +676,15 @@ if (remaining == 0) throw new IllegalArgumentException(AbstractBufferStrategy.ERR_BUFFER_EMPTY); - /* - * Note: We need to be synchronized on the ByteBuffer here since - * this operation relies on the position() being stable. - * - * Note: No other code adjust touch the buffer's position() !!! - */ + /* + * Note: We need to be synchronized on the ByteBuffer here since + * this operation relies on the position() being stable. + * + * Note: Also see clearAddrMap(long) which is synchronized on the + * acquired ByteBuffer in the same manner to protect it during + * critical sections which have a side effect on the buffer + * position. + */ final int pos; synchronized (tmp) { @@ -730,7 +741,7 @@ counters.naccept++; counters.bytesAccepted += nwrite; - } + } // synchronized(tmp) /* * Add metadata for the record so it can be read back from the @@ -1737,21 +1748,26 @@ // } final ByteBuffer tmp = acquire(); try { - if (tmp.remaining() >= 12) { - // We must synchronize - synchronized (tmp) { - int spos = tmp.position(); - tmp.putLong(addr); - tmp.putInt(0); - if (checker != null) { - // update the checksum (no side-effects on [data]) - ByteBuffer chkBuf = tmp.asReadOnlyBuffer(); - chkBuf.position(spos); - chkBuf.limit(tmp.position()); - checker.update(chkBuf); - } - } - } + if (tmp.remaining() >= 12) { + /* + * Note: We must synchronize before having a side effect on + * position. Also see write(...) which is synchronized on + * the buffer during critical sections which have a side + * effect on the buffer position. + */ + synchronized (tmp) { + final int spos = tmp.position(); + tmp.putLong(addr); + tmp.putInt(0); + if (checker != null) { + // update the checksum (no side-effects on [data]) + final ByteBuffer chkBuf = tmp.asReadOnlyBuffer(); + chkBuf.position(spos); + chkBuf.limit(tmp.position()); + checker.update(chkBuf); + } + } // synchronized(tmp) + } } finally { release(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-09-14 10:57:28
|
Revision: 3541 http://bigdata.svn.sourceforge.net/bigdata/?rev=3541&view=rev Author: thompsonbry Date: 2010-09-14 10:57:21 +0000 (Tue, 14 Sep 2010) Log Message: ----------- Reorganized to synchronize on the buffer before testing remaining() since that depends on position(). This addresses the following stack trace for the RWStore. Caused by: java.nio.BufferOverflowException > [java] at java.nio.Buffer.nextPutIndex(Buffer.java:501) > [java] at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:736) > [java] at com.bigdata.io.writecache.WriteCache.clearAddrMap(WriteCache.java:1761) > [java] at com.bigdata.io.writecache.WriteCacheService.clearWrite(WriteCacheService.java:1966) > [java] at com.bigdata.rwstore.RWStore.immediateFree(RWStore.java:1154) > [java] at com.bigdata.rwstore.RWStore.free(RWStore.java:1126) > [java] at com.bigdata.journal.RWStrategy.delete(RWStrategy.java:321) > [java] at com.bigdata.journal.RWStrategy.delete(RWStrategy.java:309) > [java] at com.bigdata.journal.AbstractJournal.delete(AbstractJournal.java:2625) > [java] at com.bigdata.btree.Node.replaceChildRef(Node.java:870) > [java] at com.bigdata.btree.AbstractNode.copyOnWrite(AbstractNode.java:546) > [java] at com.bigdata.btree.AbstractNode.copyOnWrite(AbstractNode.java:417) > [java] at com.bigdata.btree.Leaf.insert(Leaf.java:490) > [java] at com.bigdata.btree.Node.insert(Node.java:913) > [java] at com.bigdata.btree.Node.insert(Node.java:913) > [java] at com.bigdata.btree.Node.insert(Node.java:913) > [java] at com.bigdata.btree.AbstractBTree.insert(AbstractBTree.java:2046) > [java] at com.bigdata.btree.AbstractBTree.insert(AbstractBTree.java:1990) > [java] at com.bigdata.rdf.spo.SPOIndexWriteProc.apply(SPOIndexWriteProc.java:247) > [java] at com.bigdata.btree.UnisolatedReadWriteIndex.submit(UnisolatedReadWriteIndex.java:796) > [java] at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:332) > [java] at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:69) > [java] at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > [java] at java.util.concurrent.FutureTask.run(FutureTask.java:138) > [java] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > [java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > [java] at java.lang.Thread.run(Thread.java:619) Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-13 21:00:14 UTC (rev 3540) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-09-14 10:57:21 UTC (rev 3541) @@ -1749,26 +1749,27 @@ // } final ByteBuffer tmp = acquire(); try { - if (tmp.remaining() >= 12) { - /* - * Note: We must synchronize before having a side effect on - * position. Also see write(...) which is synchronized on - * the buffer during critical sections which have a side - * effect on the buffer position. - */ - synchronized (tmp) { - final int spos = tmp.position(); - tmp.putLong(addr); - tmp.putInt(0); - if (checker != null) { - // update the checksum (no side-effects on [data]) - final ByteBuffer chkBuf = tmp.asReadOnlyBuffer(); - chkBuf.position(spos); - chkBuf.limit(tmp.position()); - checker.update(chkBuf); - } - } // synchronized(tmp) - } + /* + * Note: We must synchronize before having a side effect on + * position (which includes depending on remaining()). Also see + * write(...) which is synchronized on the buffer during + * critical sections which have a side effect on the buffer + * position. + */ + synchronized (tmp) { + if (tmp.remaining() >= 12) { + final int spos = tmp.position(); + tmp.putLong(addr); + tmp.putInt(0); + if (checker != null) { + // update the checksum (no side-effects on [data]) + final ByteBuffer chkBuf = tmp.asReadOnlyBuffer(); + chkBuf.position(spos); + chkBuf.limit(tmp.position()); + checker.update(chkBuf); + } + } + } // synchronized(tmp) } finally { release(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |