From: <mar...@us...> - 2011-06-24 14:41:21
|
Revision: 4789 http://bigdata.svn.sourceforge.net/bigdata/?rev=4789&view=rev Author: martyncutcher Date: 2011-06-24 14:41:14 +0000 (Fri, 24 Jun 2011) Log Message: ----------- Fixes problem with session protection where releaseSessions failed to clear writes from the writeCache that had been freed but written prior to the last commit. It also fixes a problem where the releaseSession failed to maintain an accurate count of current free bits that could result in the Allocator on the free list not having any free space. The test has been refined to fail fast. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2011-06-24 12:31:19 UTC (rev 4788) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2011-06-24 14:41:14 UTC (rev 4789) @@ -363,8 +363,11 @@ if (m_addr != 0) { // check active! for (int i = 0; i < m_live.length; i++) { int chkbits = m_transients[i]; + // check all addresses set in m_transients NOT set in m_live + chkbits &= ~m_live[i]; + + // reset transients to live OR commit m_transients[i] = m_live[i] | m_commit[i]; - chkbits &= ~m_transients[i]; final int startBit = i * 32; @@ -384,4 +387,92 @@ return sb.toString(); } + /** + * @return number of bits that will be cleared in a session release + */ + int sessionBits() { + int freebits = 0; + + if (m_addr != 0) { // check active! + for (int i = 0; i < m_live.length; i++) { + int chkbits = m_transients[i]; + if (chkbits != 0) { + // chkbits &= ~(m_live[i] | m_commit[i]); + chkbits &= ~m_live[i]; + + if (chkbits != 0) { + // there are writes to clear + for (int b = 0; b < 32; b++) { + if ((chkbits & (1 << b)) != 0) { + freebits++; + } + } + } + } + } + } + + return freebits; + } + + /** + * @return number of bits immediately available for allocation + */ + int freeBits() { + int freebits = 0; + + if (m_addr != 0) { // check active! + for (int i = 0; i < m_live.length; i++) { + int chkbits = ~m_transients[i]; + + if (chkbits != 0) { + if (chkbits == 0xFFFFFFFF) { + freebits += 32; + } else { + for (int b = 0; b < 32; b++) { + if ((chkbits & (1 << b)) != 0) { + freebits++; + } + } + } + } + } + } else { + freebits += m_live.length * 32; + } + + return freebits; + } + + /** + * transients frees as defined by those bits set in transients but NOT set + * in live + * @return number of transient frees + */ + int transientBits() { + int freebits = 0; + + if (m_addr != 0) { // check active! + for (int i = 0; i < m_live.length; i++) { + int chkbits = m_transients[i] & ~m_live[i]; + + if (chkbits != 0) { + if (chkbits == 0xFFFFFFFF) { + freebits += 32; + } else { + for (int b = 0; b < 32; b++) { + if ((chkbits & (1 << b)) != 0) { + freebits++; + } + } + } + } + } + } else { + freebits += m_live.length * 32; + } + + return freebits; + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-24 12:31:19 UTC (rev 4788) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-24 14:41:14 UTC (rev 4789) @@ -25,6 +25,7 @@ package com.bigdata.rwstore; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.io.*; @@ -252,6 +253,17 @@ } if (!protectTransients) { + /** + * This assert will trip if any address was freed under + * session protection and therefore remained accessible + * until released. + * The value returned by releaseSession should be zero + * since all "frees" should already have removed any + * writes to the writeCacheService + */ + assert m_sessionFrees.intValue() == 0; + // assert block.releaseSession(m_store.m_writeCache) == 0; + block.m_transients = block.m_live.clone(); } @@ -544,6 +556,9 @@ } private boolean m_freeWaiting = true; + + // track number of frees to be cleared on session releases + private AtomicInteger m_sessionFrees = new AtomicInteger(0); public boolean free(final int addr, final int size) { return free(addr, size, false); @@ -566,9 +581,9 @@ * without first clearing addresses them from the writeCacheService */ final boolean tmp = m_sessionActive; - m_sessionActive = m_store.isSessionProtected(); + m_sessionActive = tmp || m_store.isSessionProtected(); if (tmp && !m_sessionActive) throw new AssertionError(); - + try { if (((AllocBlock) m_allocBlocks.get(block)) .freeBit(offset % nbits, m_sessionActive && !overideSession)) { // bit adjust @@ -580,8 +595,23 @@ checkFreeList(); } else { m_freeTransients++; + + if (m_sessionActive) { + boolean assertsEnabled = false; + assert assertsEnabled = true; + if (assertsEnabled){ + final int sessionFrees = m_sessionFrees.incrementAndGet(); + int sessionBits = 0; + for (AllocBlock ab : m_allocBlocks) { + sessionBits += ab.sessionBits(); + } + assert sessionFrees <= sessionBits : "sessionFrees: " + sessionFrees + " > sessionBits: " + sessionBits; + } + } + } + if (m_statsBucket != null) { m_statsBucket.delete(size); } @@ -698,16 +728,14 @@ return value; } else { - if (log.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("FixedAllocator returning null address, with freeBits: " + m_freeBits + "\n"); - - for (AllocBlock ab: m_allocBlocks) { - sb.append(ab.show() + "\n"); - } - - log.debug(sb); - } + StringBuilder sb = new StringBuilder(); + sb.append("FixedAllocator returning null address, with freeBits: " + m_freeBits + "\n"); + + for (AllocBlock ab: m_allocBlocks) { + sb.append(ab.show() + "\n"); + } + + log.error(sb); return 0; } @@ -873,20 +901,61 @@ } if (this.m_sessionActive) { - if (log.isTraceEnabled()) - log.trace("Allocator: #" + m_index + " releasing session protection"); - - int releasedAllocations = 0; - for (AllocBlock ab : m_allocBlocks) { - releasedAllocations += ab.releaseSession(cache); - } - - m_freeBits += releasedAllocations; - m_freeTransients -= releasedAllocations; - - checkFreeList(); - - m_sessionActive = m_store.isSessionProtected(); + final int start = m_sessionFrees.intValue(); + // try { + if (log.isTraceEnabled()) + log.trace("Allocator: #" + m_index + " releasing session protection"); + + + int releasedAllocations = 0; + for (AllocBlock ab : m_allocBlocks) { + releasedAllocations += ab.releaseSession(cache); + } + + assert !m_store.isSessionProtected() : "releaseSession called with isSessionProtected: true"; + + m_sessionActive = false; // should only need indicate that it contains no cached writes + + + m_freeBits = freebits(); + final int freebits = freebits(); + if (m_freeBits > freebits) + log.error("m_freeBits too high: " + m_freeBits + " > (calc): " + freebits); + + m_freeTransients = transientbits(); + + checkFreeList(); + + // assert m_sessionFrees == releasedAllocations : "Allocator: " + hashCode() + " m_sessionFrees: " + m_sessionFrees + " != released: " + releasedAllocations; + if (start > releasedAllocations) { + log.error("BAD! Allocator: " + hashCode() + ", size: " + m_size + " m_sessionFrees: " + m_sessionFrees.intValue() + " > released: " + releasedAllocations); + } else { + // log.error("GOOD! Allocator: " + hashCode() + ", size: " + m_size + " m_sessionFrees: " + m_sessionFrees.intValue() + " <= released: " + releasedAllocations); + } + // } finally { + final int end = m_sessionFrees.getAndSet(0); + assert start == end : "SessionFrees concurrent modification: " + start + " != " + end; + // } + } else { + assert m_sessionFrees.intValue() == 0 : "Session Inactive with sessionFrees: " + m_sessionFrees.intValue(); } } + + private int freebits() { + int freeBits = 0; + for (AllocBlock ab : m_allocBlocks) { + freeBits += ab.freeBits(); + } + + return freeBits; + } + + private int transientbits() { + int freeBits = 0; + for (AllocBlock ab : m_allocBlocks) { + freeBits += ab.transientBits(); + } + + return freeBits; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-06-24 12:31:19 UTC (rev 4788) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-06-24 14:41:14 UTC (rev 4789) @@ -394,7 +394,7 @@ private final Quorum<?,?> m_quorum; - private final RWWriteCacheService m_writeCache; + final RWWriteCacheService m_writeCache; /** * The actual allocation sizes as read from the store. @@ -1602,8 +1602,6 @@ * transaction protection and isolated AllocationContexts. */ if (this.isSessionProtected()) { - final boolean overrideSession = context != null && alloc.canImmediatelyFree(addr, sze, context); - if (context != null) { if (alloc.canImmediatelyFree(addr, sze, context)) { immediateFree(addr, sze, true); Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java 2011-06-24 12:31:19 UTC (rev 4788) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestMROWTransactions.java 2011-06-24 14:41:14 UTC (rev 4789) @@ -218,7 +218,7 @@ * has been freed and is no longer protected from recycling when an * attempt is made to read from it. */ - final int nthreads = 10; // up count to increase chance startup condition + final int nthreads = 5; // up count to increase chance startup condition // decrement to increase chance of idle (no sessions) final int nuris = 2000; // number of unique subject/objects final int npreds = 50; // @@ -328,8 +328,19 @@ } finally { ((Journal) origStore.getIndexManager()).abort(txId); } - } catch (Throwable t) { - log.error(t, t); + } catch (Throwable ise) { + if (!InnerCause.isInnerCause(ise, + InterruptedException.class)) { + if (failex + .compareAndSet(null/* expected */, ise/* newValue */)) { + log.error("firstCause:" + ise, ise); + } else { + if (log.isInfoEnabled()) + log.info("Other error: " + ise, ise); + } + } else { + // Ignore. + } } return null; } @@ -347,15 +358,20 @@ DaemonThreadFactory.defaultThreadFactory()); // let's schedule a few writers and readers (more than needed) - for (int i = 0; i < 3000; i++) { + // writers.submit(new Writer(5000000/* nwrite */)); + for (int i = 0; i < 5000; i++) { writers.submit(new Writer(500/* nwrite */)); - for (int rdrs = 0; rdrs < 60; rdrs++) { - readers.submit(new Reader(20/* nread */)); + for (int rdrs = 0; rdrs < 20; rdrs++) { + readers.submit(new Reader(60/* nread */)); } } - // let the writers run riot for a time - Thread.sleep(60 * 1000); + // let the writers run riot for a time, checking for failure + for (int i = 0; i < 60; i++) { + Thread.sleep(1000); + if (failex.get() != null) + break; + } writers.shutdownNow(); readers.shutdownNow(); writers.awaitTermination(5, TimeUnit.SECONDS); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |