From: <mar...@us...> - 2011-06-21 16:08:12
|
Revision: 4754 http://bigdata.svn.sourceforge.net/bigdata/?rev=4754&view=rev Author: martyncutcher Date: 2011-06-21 16:08:06 +0000 (Tue, 21 Jun 2011) Log Message: ----------- ensure cache stats are maintained correctly, and that the current referenced cache is never also on the free or dirty lists Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -130,10 +130,6 @@ * {@link #flush(boolean, long, TimeUnit)}, {@link #reset()}, and * {@link #close()}. */ -// * <p> -// * Note: To avoid lock ordering problems, acquire the read lock before you -// * increment the latch and acquire the write lock before you await the -// * latch. final private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** @@ -651,7 +647,8 @@ // header block. if (m_written) { // should be clean, NO WAY should this be written to! - log.warn("Writing to CLEAN cache: " + hashCode()); + log.error("Writing to CLEAN cache: " + hashCode()); + throw new IllegalStateException("Writing to CLEAN cache: " + hashCode()); } if (data == null) @@ -1051,21 +1048,27 @@ // write the data on the disk file. final boolean ret = writeOnChannel(view, getFirstOffset(), Collections.unmodifiableMap(recordMap), remaining); + + if (!ret) { + throw new IllegalStateException("Unable to flush WriteCache"); + } - if(!ret) - throw new TimeoutException(); - counters.nflush++; - if (ret && reset) { + if (reset) { /* * Atomic reset while holding the lock to prevent new * records from being written onto the buffer concurrently. + * + * FIXME: If the WriteCache is used directly then this makes + * sense, but if called from WriteCacheService then this must + * always clear the "master" recordMap of the WriteCacheService + * */ - + reset(); - + } return ret; @@ -1191,6 +1194,41 @@ */ public void reset() throws InterruptedException { + final Iterator<Long> entries = recordMap.keySet().iterator(); + + if (serviceRecordMap != null && entries.hasNext()) { + if (log.isInfoEnabled()) + log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); + + while (entries.hasNext()) { + final Long addr = entries.next(); + + /* + * We need to guard against the possibility that the entry in + * the service record map has been updated concurrently such + * that it now points to a different WriteCache instance. This + * is possible (for the RWStore) if a recently freed record has + * been subsequently reallocated on a different WriteCache. + * Using the conditional remove on ConcurrentMap guards against + * this. + */ + boolean removed = serviceRecordMap.remove(addr, this); + + registerWriteStatus(addr, 0, removed ? 'R' : 'L'); + + } + + } else { + if (log.isInfoEnabled()) + log.info("clean WriteCache: hashCode=" + hashCode()); // debug + // to + // see + // recycling + if (m_written) { + log.warn("Written WriteCache but with no records"); + } + } + final Lock writeLock = lock.writeLock(); writeLock.lockInterruptibly(); @@ -1294,7 +1332,6 @@ * * @param tmp */ - // ... and having the {@link #latch} at zero. private void _resetState(final ByteBuffer tmp) { if (tmp == null) @@ -1865,24 +1902,26 @@ } } } // synchronized(tmp) + + /* + * Fix up the debug flag when last address is cleared. + */ + if (m_written && recordMap.isEmpty()) { + m_written = false; + } } finally { release(); } } - - /* - * Fix up the debug flag when last address is cleared. - */ - if (m_written && recordMap.isEmpty()) { - m_written = false; - } } protected void registerWriteStatus(long offset, int length, char action) { - // NOP to be overridden for debug if required + // NOP to be overidden for debug if required } boolean m_written = false; + + ConcurrentMap<Long, WriteCache> serviceRecordMap; private long lastOffset; @@ -1890,7 +1929,7 @@ * Called to clear the WriteCacheService map of references to this * WriteCache. * - * @param recordMap + * @param serviceRecordMap * the map of the WriteCacheService that associates an address * with a WriteCache * @param fileExtent @@ -1900,40 +1939,8 @@ public void resetWith(final ConcurrentMap<Long, WriteCache> serviceRecordMap, final long fileExtent) throws InterruptedException { - final Iterator<Long> entries = recordMap.keySet().iterator(); - if (entries.hasNext()) { - if (log.isInfoEnabled()) - log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); - - while (entries.hasNext()) { - final Long addr = entries.next(); - - /* - * We need to guard against the possibility that the entry in - * the service record map has been updated concurrently such - * that it now points to a different WriteCache instance. This - * is possible (for the RWStore) if a recently freed record has - * been subsequently reallocated on a different WriteCache. - * Using the conditional remove on ConcurrentMap guards against - * this. - */ - boolean removed = serviceRecordMap.remove(addr, this); - - registerWriteStatus(addr, 0, removed ? 'R' : 'L'); - - } - - } else { - if (log.isInfoEnabled()) - log.info("clean WriteCache: hashCode=" + hashCode()); // debug - // to - // see - // recycling - if (m_written) { - log.warn("Written WriteCache but with no records"); - } - } - + this.serviceRecordMap = serviceRecordMap; + reset(); // must ensure reset state even if cache already empty setFileExtent(fileExtent); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -439,9 +439,13 @@ // save the current file extent. this.fileExtent.set(fileExtent); - // N-1 WriteCache instances. - for (int i = 0; i < nbuffers - 1; i++) { + // Add [current] WriteCache. + current.set(buffers[0] = newWriteCache(null/* buf */, + useChecksum, false/* bufferHasData */, opener)); + // add remaining buffers. + for (int i = 1; i < nbuffers; i++) { + final WriteCache tmp = newWriteCache(null/* buf */, useChecksum, false/* bufferHasData */, opener); @@ -451,9 +455,6 @@ } - // One more WriteCache for [current]. - current.set(buffers[nbuffers - 1] = newWriteCache(null/* buf */, - useChecksum, false/* bufferHasData */, opener)); // Set the same counters object on each of the write cache instances. final WriteCacheServiceCounters counters = new WriteCacheServiceCounters( @@ -613,6 +614,7 @@ // Now written, remove from dirtylist. dirtyList.take(); + counters.get().ndirty--; dirtyListLock.lockInterruptibly(); try { @@ -910,22 +912,22 @@ t.reset(); } - // re-populate the clean list with N-1 of our buffers - for (int i = 0; i < buffers.length - 1; i++) { - cleanList.put(buffers[i]); - } - // clear the service record map. recordMap.clear(); // set the current buffer. - current.set(buffers[buffers.length - 1]); + current.set(buffers[0]); + // re-populate the clean list with remaining buffers + for (int i = 1; i < buffers.length; i++) { + cleanList.put(buffers[i]); + } + // reset the counters. { final WriteCacheServiceCounters c = counters.get(); c.ndirty = 0; - c.nclean = buffers.length; + c.nclean = buffers.length-1; c.nreset++; } @@ -1253,7 +1255,7 @@ if (!writeLock.tryLock(remaining, TimeUnit.NANOSECONDS)) throw new TimeoutException(); try { - final WriteCache tmp = current.get(); + final WriteCache tmp = current.getAndSet(null); if (tmp.remaining() == 0) { /* * Handle an empty buffer by waiting until the dirtyList is @@ -1293,6 +1295,7 @@ * code is much less complex here. */ dirtyList.add(tmp); + counters.get().ndirty++; dirtyListNotEmpty.signalAll(); while (!dirtyList.isEmpty() && !halt) { // remaining := (total - elapsed). @@ -1326,6 +1329,7 @@ } // Guaranteed available hence non-blocking. final WriteCache nxt = cleanList.take(); + counters.get().nclean--; nxt.resetWith(recordMap, fileExtent.get()); current.set(nxt); return true; @@ -1440,6 +1444,9 @@ throw new IllegalArgumentException( AbstractBufferStrategy.ERR_BUFFER_NULL); + // maintain nwrites + counters.get().nwrites++; + // #of bytes in the record. final int remaining = data.remaining(); @@ -1481,7 +1488,7 @@ // A duplicate may also be indicative of an allocation // error, which we need to be pretty strict about! if (old == cache) { - throw new AssertionError("Record already in cache: offset=" + offset+" "+addrDebugInfo(offset)); + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } return true; @@ -1535,7 +1542,7 @@ */ if (recordMap.put(offset, cache) != null) { // The record should not already be in the cache. - throw new AssertionError("Record already in cache: offset=" + offset+" "+addrDebugInfo(offset)); + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } return true; @@ -1600,7 +1607,7 @@ // Take a buffer from the cleanList (guaranteed avail). final WriteCache newBuffer = cleanList.take(); - + counters.get().nclean--; // Clear the state on the new buffer and remove from // cacheService map newBuffer.resetWith(recordMap, fileExtent.get()); @@ -1613,7 +1620,7 @@ // This must be the only occurrence of this record. if (recordMap.put(offset, cache) != null) { - throw new AssertionError("Record already in cache: offset=" + offset+" "+addrDebugInfo(offset)); + throw new AssertionError("Record already in cache: offset=" + offset + " " + addrDebugInfo(offset)); } return true; @@ -1629,7 +1636,7 @@ /* * Should never happen. */ - throw new AssertionError("Unable to write into current WriteCache"); + throw new AssertionError("Unable to write into current WriteCache " + offset + " " + addrDebugInfo(offset)); } finally { @@ -1861,8 +1868,9 @@ if (!lock.isWriteLockedByCurrentThread()) throw new IllegalMonitorStateException(); - final WriteCache cache = current.get(); + final WriteCache cache = current.getAndSet(null); assert cache != null; + /* * Note: The lock here is required to give flush() atomic semantics with * regard to the set of dirty write buffers when flush() gained the @@ -1897,6 +1905,7 @@ // Take a buffer from the cleanList (guaranteed avail). final WriteCache newBuffer = cleanList.take(); + counters.get().nclean--; // Clear state on new buffer and remove from cacheService map newBuffer.resetWith(recordMap, fileExtent.get()); @@ -1989,18 +1998,24 @@ */ public boolean clearWrite(final long offset) { try { + counters.get().nclearRequests++; final WriteCache cache = recordMap.remove(offset); if (cache == null) return false; - final WriteCache cur = acquireForWriter(); // in case current + + // Is there any point in acquiring for writer (with the readLock)? + // It prevents concurrent access with the write method that takes + // the writeLock, but is this a problem? + //final WriteCache cur = acquireForWriter(); // in case current + counters.get().nclears++; + //try { debugAddrs(offset, 0, 'F'); - try { cache.clearAddrMap(offset); return true; - } finally { - release(); - } + //} finally { + // release(); + //} } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2100,7 +2115,19 @@ * The #of {@link WriteCache} blocks sent by the leader to the first * downstream follower. */ - public volatile long nsend; + public volatile long nsend; + /** + * The #of writes made to the writeCacheService. + */ + public volatile long nwrites; + /** + * The #of addresses cleared by the writeCacheService. + */ + public volatile long nclearRequests; + /** + * The #of addresses cleared by the writeCacheService. + */ + public volatile long nclears; public WriteCacheServiceCounters(final int nbuffers) { @@ -2145,6 +2172,24 @@ } }); + root.addCounter("nwrites", new Instrument<Long>() { + public void sample() { + setValue(nwrites); + } + }); + + root.addCounter("nclearRequests", new Instrument<Long>() { + public void sample() { + setValue(nclearRequests); + } + }); + + root.addCounter("nclears", new Instrument<Long>() { + public void sample() { + setValue(nclears); + } + }); + root.addCounter("mbPerSec", new Instrument<Double>() { public void sample() { final double mbPerSec = (((double) bytesWritten) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -557,7 +557,16 @@ final int block = offset/nbits; - m_sessionActive = m_store.isSessionProtected(); + /** + * When a session is released any m_sessionActive FixedAllocators + * should be atomically released. + * However, if any state allowed a call to free once the store + * is not session protected, this must NOT overwrite m_sessionActive + * if it is already set since a commit would reset the transient bits + * without first clearing addresses them from the writeCacheService + */ + + m_sessionActive = m_sessionActive || m_store.isSessionProtected(); try { if (((AllocBlock) m_allocBlocks.get(block)) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-06-21 15:23:36 UTC (rev 4753) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-06-21 16:08:06 UTC (rev 4754) @@ -4545,5 +4545,9 @@ log.warn("WriteCacheDebug: " + paddr + " - " + m_writeCache.addrDebugInfo(paddr)); } + public CounterSet getWriteCacheCounters() { + return m_writeCache.getCounters(); + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |