From: <mar...@us...> - 2011-06-21 17:19:12
|
Revision: 4756 http://bigdata.svn.sourceforge.net/bigdata/?rev=4756&view=rev Author: martyncutcher Date: 2011-06-21 17:19:05 +0000 (Tue, 21 Jun 2011) Log Message: ----------- backout change to writeCache retaining serviceRecordMap and instead drop interface to force calling of flushWithRest Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 16:21:38 UTC (rev 4755) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 17:19:05 UTC (rev 4756) @@ -927,27 +927,6 @@ } /** - * Variant which resets the cache if it was successfully flushed. - */ - public void flushAndReset(final boolean force) throws IOException, InterruptedException { - - try { - - if (!flushAndReset(force, true/* reset */, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - - throw new RuntimeException(); - - } - - } catch (TimeoutException e) { - - throw new RuntimeException(e); - - } - - } - - /** * Flush the writes to the backing channel but DOES NOT sync the channel and * DOES NOT {@link #reset()} the {@link WriteCache}. {@link #reset()} is a * separate operation because a common use is to retain recently flushed @@ -964,30 +943,6 @@ public boolean flush(final boolean force, final long timeout, final TimeUnit unit) throws IOException, TimeoutException, InterruptedException { - return flushAndReset(force, false/* reset */, timeout, unit); - - } - - /** - * Core impl. - * - * @param forceIsIgnored - * ignored (deprecated). - * @param reset - * When <code>true</code>, does atomic reset IFF the flush was - * successful in the allowed time while holding the lock to - * prevent new records from being written onto the buffer - * concurrently. - * @param timeout - * @param unit - * @return - * @throws IOException - * @throws TimeoutException - * @throws InterruptedException - */ - private boolean flushAndReset(final boolean forceIsIgnored, final boolean reset, final long timeout, - final TimeUnit unit) throws IOException, TimeoutException, InterruptedException { - // start time final long begin = System.nanoTime(); @@ -1050,27 +1005,11 @@ remaining); if (!ret) { - throw new IllegalStateException("Unable to flush WriteCache"); + throw new TimeoutException("Unable to flush WriteCache"); } counters.nflush++; - if (reset) { - - /* - * Atomic reset while holding the lock to prevent new - * records from being written onto the buffer concurrently. - * - * FIXME: If the WriteCache is used directly then this makes - * sense, but if called from WriteCacheService then this must - * always clear the "master" recordMap of the WriteCacheService - * - */ - - reset(); - - } - return ret; } @@ -1194,41 +1133,6 @@ */ public void reset() throws InterruptedException { - final Iterator<Long> entries = recordMap.keySet().iterator(); - - if (serviceRecordMap != null && entries.hasNext()) { - if (log.isInfoEnabled()) - log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); - - while (entries.hasNext()) { - final Long addr = entries.next(); - - /* - * We need to guard against the possibility that the entry in - * the service record map has been updated concurrently such - * that it now points to a different WriteCache instance. This - * is possible (for the RWStore) if a recently freed record has - * been subsequently reallocated on a different WriteCache. - * Using the conditional remove on ConcurrentMap guards against - * this. - */ - boolean removed = serviceRecordMap.remove(addr, this); - - registerWriteStatus(addr, 0, removed ? 'R' : 'L'); - - } - - } else { - if (log.isInfoEnabled()) - log.info("clean WriteCache: hashCode=" + hashCode()); // debug - // to - // see - // recycling - if (m_written) { - log.warn("Written WriteCache but with no records"); - } - } - final Lock writeLock = lock.writeLock(); writeLock.lockInterruptibly(); @@ -1921,8 +1825,6 @@ boolean m_written = false; - ConcurrentMap<Long, WriteCache> serviceRecordMap; - private long lastOffset; /** @@ -1939,8 +1841,40 @@ public void resetWith(final ConcurrentMap<Long, WriteCache> serviceRecordMap, final long fileExtent) throws InterruptedException { - this.serviceRecordMap = serviceRecordMap; + final Iterator<Long> entries = recordMap.keySet().iterator(); + if (serviceRecordMap != null && entries.hasNext()) { + if (log.isInfoEnabled()) + log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); + + while (entries.hasNext()) { + final Long addr = entries.next(); + + /* + * We need to guard against the possibility that the entry in + * the service record map has been updated concurrently such + * that it now points to a different WriteCache instance. This + * is possible (for the RWStore) if a recently freed record has + * been subsequently reallocated on a different WriteCache. + * Using the conditional remove on ConcurrentMap guards against + * this. + */ + boolean removed = serviceRecordMap.remove(addr, this); + + registerWriteStatus(addr, 0, removed ? 'R' : 'L'); + + } + + } else { + if (log.isInfoEnabled()) + log.info("clean WriteCache: hashCode=" + hashCode()); // debug + // to + // see + // recycling + if (m_written) { + log.warn("Written WriteCache but with no records"); + } + } reset(); // must ensure reset state even if cache already empty setFileExtent(fileExtent); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 16:21:38 UTC (rev 4755) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 17:19:05 UTC (rev 4756) @@ -565,8 +565,9 @@ * if it is already set since a commit would reset the transient bits * without first clearing addresses them from the writeCacheService */ - - m_sessionActive = m_sessionActive || m_store.isSessionProtected(); + final boolean tmp = m_sessionActive; + m_sessionActive = m_store.isSessionProtected(); + if (tmp && !m_sessionActive) throw new AssertionError(); try { if (((AllocBlock) m_allocBlocks.get(block)) @@ -866,7 +867,7 @@ m_statsBucket = b; } - public void releaseSession(RWWriteCacheService cache) { + void releaseSession(RWWriteCacheService cache) { if (m_context != null) { throw new IllegalStateException("Calling releaseSession on shadowed allocator"); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |