From: <mar...@us...> - 2010-08-02 09:43:00
|
Revision: 3386 http://bigdata.svn.sourceforge.net/bigdata/?rev=3386&view=rev Author: martyncutcher Date: 2010-08-02 09:42:54 +0000 (Mon, 02 Aug 2010) Log Message: ----------- Add support for management of deferred frees and testing for address allocation Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -116,7 +116,8 @@ RWStore.clrBit(m_bits, bit); if (!RWStore.tstBit(m_commit, bit)) { - m_writeCache.clearWrite(addr); + // Should not be cleared here! + // m_writeCache.clearWrite(addr); RWStore.clrBit(m_transients, bit); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -39,6 +39,7 @@ public int getDiskAddr(); public void setDiskAddr(int addr); public long getPhysicalAddress(int offset); + public boolean isAllocated(int offset); public int getPhysicalSize(int offset); public byte[] write(); public void read(DataInputStream str); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -265,4 +265,8 @@ str.append("Index: " + m_index + ", address: " + getStartAddr() + ", BLOB\n"); } + public boolean isAllocated(int offset) { + return m_hdrs[offset] != 0; + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -372,7 +372,7 @@ } } } - + return false; } @@ -501,4 +501,15 @@ return alloted * m_size; } + + public boolean isAllocated(int offset) { + offset -= 3; + + int allocBlockRange = 32 * m_bitSize; + + AllocBlock block = (AllocBlock) m_allocBlocks.get(offset / allocBlockRange); + int bit = offset % allocBlockRange; + + return RWStore.tstBit(block.m_bits, bit); + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -38,11 +38,14 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.naming.OperationNotSupportedException; + import org.apache.log4j.Logger; import com.bigdata.io.FileChannelUtility; @@ -50,6 +53,8 @@ import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.ForceEnum; import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.ITransactionService; +import com.bigdata.journal.JournalTransactionService; import com.bigdata.journal.Options; import com.bigdata.journal.RWStrategy.FileMetadataView; import com.bigdata.quorum.Quorum; @@ -159,6 +164,46 @@ * to determine chained blocks for freeing. This is particularly important * for larger stores where a disk cache could be flushed through simply freeing * BLOB allocations. + * + * Deferred Free List + * + * In order to provide support for read-only transactions across store + * mutations, storage that is free'd in the mutation cannot be free'd immediately. + * + * The deferredFreeList is effectively a table of txReleaseTimes and + * and addresses to data with lists of addresses to be freed. + * + * OTOH, if in general we only ever free store "in the next transaction" then + * the big advantage of storage reallocation in bulk upload is lost. The logic + * should hold that if we can be sure that no read-only transaction can be + * active then we can free immediately, otherwise we should defer call to + * complete the free until the commit point of any current transaction has + * been passed. + * + * The logic of determining whether to call free or deferFree is left to the + * client and whatever transaction management policy they follow. The support + * assmumes that a safety-first policy of always calling deferFree should not + * cause performance or resource problems. + * + * Since there is a cost of synchronizing with the transaction service to check + * the transaction state, the check is only made either prior to commit or + * after every so many deferFree requests. + * + * The management of the deferredFreeList is via a management block referenced + * from the metaAllocation data. This contains a reference to a block of + * addresses that each reference a block of deferred frees. + * + * At runtime, this list includes the txnReleaseTime when the allocation was + * "freed". + * + * The release times stored are the last commit time at the time of the + * original free request. The current transaction environment covers a window + * of first release time and the current commit time. When the stored release + * time is before the current first release time then that allocation can + * be safely freed. + * + * When a store is opened this window is not needed and any deferred frees can + * be immediately freed since no transactions will be active. */ public class RWStore implements IStore { @@ -275,9 +320,44 @@ * significant contention may be avoided. */ final private ReentrantLock m_allocationLock = new ReentrantLock(); + + /** + * This lock controls access to the deferredFree structures used + * in deferFree. + * + * The deferral of freeing storage supports processing of read-only + * transactions concurrent with modifying/mutation tasks + */ + final private ReentrantLock m_deferFreeLock = new ReentrantLock(); - private ReopenFileChannel m_reopener = null; + /** + * The deferredFreeList is simply an array of releaseTime,freeListAddrs + * stored at commit. + * + * Note that when the deferredFreeList is saved, ONLY thefreeListAddrs + * are stored, NOT the releaseTime. This is because on any open of + * the store, all deferredFrees can be released immediately. This + * mechanism may be changed in the future to enable explicit history + * retention, but if so a different header structure would be used since + * it would not be appropriate to retain a simple header linked to + * thousands if not milions of commit points. + * + * If the current txn list exceeds the MAX_DEFERRED_FREE then it is + * incrementally saved and a new list begun. The master list itself + * serves as a BLOB header when there is more than a single entry with + * the same txReleaseTime. + */ + private static final int MAX_DEFERRED_FREE = 4094; // fits in 16k block + final ArrayList<Long> m_deferredFreeList = new ArrayList<Long>(); + volatile int m_deferredFreeListAddr = 0; + volatile int m_deferredFreeListEntries = 0; + volatile long m_lastTxReleaseTime = 0; + final ArrayList<Integer> m_currentTxnFreeList = new ArrayList<Integer>(); + + volatile long deferredFreeCount = 0; + private ReopenFileChannel m_reopener = null; + class WriteCacheImpl extends WriteCache.FileChannelScatteredWriteCache { public WriteCacheImpl(final ByteBuffer buf, final boolean useChecksum, @@ -304,6 +384,12 @@ } } + + // Added to enable debug of rare problem + // FIXME: disable by removal once solved + protected void registerWriteStatus(long offset, int length, char action) { + m_writeCache.debugAddrs(offset, length, action); + } }; @@ -359,7 +445,8 @@ } int buffers = m_fmv.getFileMetadata().writeCacheBufferCount; - // buffers = 10; + log.warn("RWStore using writeCacheService with buffers: " + buffers); + try { m_writeCache = new RWWriteCacheService( buffers, m_raf @@ -546,12 +633,16 @@ FileChannelUtility.readAll(m_reopener, ByteBuffer.wrap(buf), rawmbaddr); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + + final int deferredFreeListAddr = strBuf.readInt(); + final int deferredFreeListEntries = strBuf.readInt(); + int allocBlocks = strBuf.readInt(); m_allocSizes = new int[allocBlocks]; for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 1; + m_metaBitsSize = metaBitsStore - allocBlocks - 3; // allow for deferred free m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -573,6 +664,8 @@ readAllocationBlocks(); + clearOutstandingDeferrels(deferredFreeListAddr, deferredFreeListEntries); + if (log.isTraceEnabled()) { final StringBuffer str = new StringBuffer(); this.showAllocators(str); @@ -590,6 +683,55 @@ + ", " + m_metaBitsAddr); } + /* + * Called when store is opened to make sure any deferred frees are + * cleared. + * + * Stored persistently is only the list of addresses of blocks to be freed, + * the knowledge of the txn release time does not need to be held persistently, + * this is only relevant for transient state while the RWStore is open. + * + * The deferredCount is the number of entries - integer address and integer + * count at each address + */ + private void clearOutstandingDeferrels(final int deferredAddr, final int deferredCount) { + if (deferredAddr != 0) { + assert deferredCount != 0; + final int sze = deferredCount * 8 + 4; // include spce fo checksum + byte[] buf = new byte[sze]; + getData(deferredAddr, buf); + + final byte[] blockBuf = new byte[8 * 1024]; // maximum size required + + ByteBuffer in = ByteBuffer.wrap(buf); + for (int i = 0; i < deferredCount; i++) { + int blockAddr = in.getInt(); + int addrCount = in.getInt(); + + // now read in this block and free all addresses referenced + getData(blockAddr, blockBuf, 0, addrCount*4 + 4); + ByteBuffer inblock = ByteBuffer.wrap(blockBuf); + for (int b = 0; b < addrCount; b++) { + final int defAddr = inblock.getInt(); + Allocator alloc = getBlock(defAddr); + if (alloc instanceof BlobAllocator) { + b++; + assert b < addrCount; + alloc.free(defAddr, inblock.getInt()); + } else { + alloc.free(defAddr, 0); // size ignored for FreeAllocators + } + } + // once read then free the block allocation + free(blockAddr, 0); + } + + // lastly free the deferredAddr + free(deferredAddr, 0); + } + + } + /********************************************************************* * make sure resource is closed! **/ @@ -827,10 +969,13 @@ int chk = ChecksumUtility.getCHK().checksum(buf, offset, length-4); // read checksum int tstchk = bb.getInt(offset + length-4); if (chk != tstchk) { + String cacheDebugInfo = m_writeCache.addrDebugInfo(paddr); log.warn("Invalid data checksum for addr: " + paddr + ", chk: " + chk + ", tstchk: " + tstchk + ", length: " + length + ", first byte: " + buf[0] + ", successful reads: " + m_diskReads - + ", at last extend: " + m_readsAtExtend + ", cacheReads: " + m_cacheReads); + + ", at last extend: " + m_readsAtExtend + ", cacheReads: " + m_cacheReads + + ", writeCacheDebug: " + cacheDebugInfo); + throw new IllegalStateException("Invalid data checksum"); } @@ -908,11 +1053,17 @@ m_allocationLock.lock(); try { final Allocator alloc = getBlockByAddress(addr); - final long pa = alloc.getPhysicalAddress(getOffset(addr)); + final int addrOffset = getOffset(addr); + final long pa = alloc.getPhysicalAddress(addrOffset); alloc.free(addr, sze); // must clear after free in case is a blobHdr that requires reading! + // the allocation lock protects against a concurrent re-allocation + // of the address before the cache has been cleared + assert pa != 0; m_writeCache.clearWrite(pa); m_frees++; + if (alloc.isAllocated(addrOffset)) + throw new IllegalStateException("Reallocation problem with WriteCache"); if (!m_commitList.contains(alloc)) { m_commitList.add(alloc); @@ -920,6 +1071,7 @@ } finally { m_allocationLock.unlock(); } + } /** @@ -991,13 +1143,13 @@ addr = allocator.alloc(this, size); - if (log.isDebugEnabled()) - log.debug("New FixedAllocator for " + cmp + " byte allocations at " + addr); + if (log.isTraceEnabled()) + log.trace("New FixedAllocator for " + cmp + " byte allocations at " + addr); m_allocs.add(allocator); } else { // Verify free list only has allocators with free bits - { + if (log.isDebugEnabled()){ int tsti = 0; Iterator<Allocator> allocs = list.iterator(); while (allocs.hasNext()) { @@ -1209,10 +1361,20 @@ * @throws IOException */ private void writeMetaBits() throws IOException { - final int len = 4 * (1 + m_allocSizes.length + m_metaBits.length); + // the metabits is now prefixed by two ints representing the + // deferred frees - the address and number of defer block addresses + // to be found there + final int len = 4 * (2 + 1 + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); + + saveDeferrals(); + + str.writeInt(m_deferredFreeListAddr); + str.writeInt(m_deferredFreeListEntries); + + str.writeInt(m_allocSizes.length); for (int i = 0; i < m_allocSizes.length; i++) { str.writeInt(m_allocSizes[i]); @@ -1317,7 +1479,36 @@ // if (m_commitCallback != null) { // m_commitCallback.commitCallback(); // } + + checkDeferredFrees(); + // save deferredFree data + if (m_deferredFreeList.size() > 0) { + final int oldDeferredFreeListAddr = m_deferredFreeListAddr; + // list contains txReleaseTime,addr of addrs to be freed + final int addrs = m_deferredFreeList.size() / 2; + final int addrSize = addrs*8; + byte[] deferBuf = new byte[addrSize]; // each addr is a long + ByteBuffer bb = ByteBuffer.wrap(deferBuf); + for (int i = 0; i < addrs; i++) { + bb.putLong(m_deferredFreeList.get(1 + (i*2))); + } + bb.flip(); + m_deferredFreeListAddr = (int) alloc(deferBuf, addrSize); + m_deferredFreeListEntries = addrs; + final int chk = ChecksumUtility.getCHK().checksum(deferBuf); + try { + m_writeCache.write(physicalAddress(m_deferredFreeListAddr), bb, chk); + } catch (IllegalStateException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + free(oldDeferredFreeListAddr, 0); + } else { + m_deferredFreeListAddr = 0; + m_deferredFreeListEntries = 0; + } // Allocate storage for metaBits long oldMetaBits = m_metaBitsAddr; int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; @@ -1387,6 +1578,28 @@ } /** + * Called prior to commit, so check whether storage can be freed and then + * whether the deferredheader needs to be saved. + */ + private void checkDeferredFrees() { + checkFreeable(); + + this.m_deferredFreeListEntries = m_deferredFreeList.size(); + if (m_deferredFreeListEntries > 0) { + int bufSize = 4 + m_deferredFreeListEntries*8; + byte[] buf = new byte[bufSize]; + ByteBuffer bb = ByteBuffer.wrap(buf); + bb.putInt(m_deferredFreeListEntries); + for (int i = 0; i < m_deferredFreeListEntries; i++) { + bb.putLong(m_deferredFreeList.get(i)); + } + m_deferredFreeListAddr = (int) alloc(buf, bufSize); + } else { + this.m_deferredFreeListAddr = 0; + } + } + + /** * * @return conservative requirement for metabits storage, mindful that the * request to allocate the metabits may require an increase in the @@ -1402,6 +1615,8 @@ int allocBlocks = (8 + commitInts)/8; ints += 9 * allocBlocks; + ints += 2; // for deferredFreeListAddr and size + return ints*4; // return as bytes } @@ -2074,8 +2289,8 @@ long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes - int metaBitsSize = m_metaBits.length + m_allocSizes.length + 1; + // include space for allocSizes and deferred free info + int metaBitsSize = 2 + m_metaBits.length + m_allocSizes.length + 1; ret += metaBitsSize; if (log.isInfoEnabled()) @@ -2317,4 +2532,183 @@ return allocated; } + /* + * Adds the address for later freeing to the deferred free list. + * + * This is maintained in a set of chained 4K buffers, with the head and + * tail referenced from the meta allocation block. + * + * If the allocation is for a BLOB then the sze is also stored + * + * The deferred list is checked on AllocBlock and prior to commit. + * + * There is also a possibility to check for deferral at this point, since + * we are passed both the currentCommitTime - against which this free + * will be deferred and the earliest tx start time against which we + * can check to see if + */ + public void deferFree(int rwaddr, int sze, long currentCommitTime) { + + m_deferFreeLock.lock(); + try { + deferredFreeCount++; + if (currentCommitTime != m_lastTxReleaseTime) { + assert m_lastTxReleaseTime == 0; + + m_lastTxReleaseTime = currentCommitTime; + } + m_currentTxnFreeList.add(rwaddr); + + final Allocator alloc = getBlockByAddress(rwaddr); + if (alloc instanceof BlobAllocator) { + m_currentTxnFreeList.add(sze); + } + if (m_currentTxnFreeList.size() >= MAX_DEFERRED_FREE) { + // Save current list and clear + saveDeferrals(); + + assert m_currentTxnFreeList.size() == 0; + } + + // every so many deferrals, check for free + if (deferredFreeCount % 100 == 0) { + checkFreeable(); + } + } finally { + m_deferFreeLock.unlock(); + } + } + + private void checkFreeable() { + if (transactionService == null) { + return; + } + + try { + transactionService.callWithLock(new Callable<Object>() { + + public Object call() throws Exception { + if (transactionService.getActiveCount() == 0) { + freeAllDeferrals(Long.MAX_VALUE); + } else { + freeAllDeferrals(transactionService.getEarliestTxStartTime()); + } + return null; + } + + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * Frees all storage deferred against a txn release time less than that + * passed in. + * + * @param txnRelease - the max release time + */ + protected void freeAllDeferrals(long txnRelease) { + final Iterator<Long> defers = m_deferredFreeList.iterator(); + while (defers.hasNext() && defers.next() < txnRelease) { + if (txnRelease != Long.MAX_VALUE) { + System.out.println("Freeing deferrals"); // FIXME remove debug + } + defers.remove(); + freeDeferrals(defers.next()); + defers.remove(); + } + if (m_lastTxReleaseTime < txnRelease) { + final Iterator<Integer> curdefers = m_currentTxnFreeList.iterator(); + while (curdefers.hasNext()) { + final int rwaddr = curdefers.next(); + Allocator alloc = getBlock(rwaddr); + if (alloc instanceof BlobAllocator) { + assert curdefers.hasNext(); + + free(rwaddr, curdefers.next()); + } else { + free(rwaddr, 0); // size ignored for FreeAllocators + } + } + m_currentTxnFreeList.clear(); + } + } + + /** + * Writes the content of currentTxnFreeList to the store. + * + * These are the current buffered frees that have yet been saved into + * a block referenced from the deferredFreeList + * + * @return the address of the deferred addresses saved on the store + */ + private void saveDeferrals() { + int addrCount = m_currentTxnFreeList.size(); + + if (addrCount == 0) { + return; + } + + byte[] buf = new byte[4 * (addrCount+1)]; + ByteBuffer out = ByteBuffer.wrap(buf); + out.putInt(addrCount); + for (int i = 0; i < addrCount; i++) { + out.putInt(m_currentTxnFreeList.get(i)); + } + + // now we've saved it to the store, clear the list + m_currentTxnFreeList.clear(); + + long rwaddr = alloc(buf, buf.length); + + rwaddr <<= 32; + rwaddr += addrCount; + + // Now add the reference of this block + m_deferredFreeList.add(m_lastTxReleaseTime); + m_deferredFreeList.add(rwaddr); + + + + } + + /** + * Provided with the address of a block of addresses to be freed + * @param blockAddr + */ + protected void freeDeferrals(long blockAddr) { + int addr = (int) -(blockAddr >> 32); + int sze = (int) blockAddr & 0xFFFFFF; + + byte[] buf = new byte[sze+4]; // allow for checksum + getData(addr, buf); + final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + m_allocationLock.lock(); + try { + int addrs = strBuf.readInt(); + while (addrs-- > 0) { + int nxtAddr = strBuf.readInt(); + Allocator alloc = getBlock(nxtAddr); + if (alloc instanceof BlobAllocator) { + assert addrs > 0; // a Blob address MUST have a size + --addrs; + free(nxtAddr, strBuf.readInt()); + } else { + free(nxtAddr, 0); // size ignored for FreeAllocators + } + } + } catch (IOException e) { + throw new RuntimeException("Problem freeing deferrals", e); + } finally { + m_allocationLock.unlock(); + } + } + + private JournalTransactionService transactionService = null; + public void setTransactionService(final JournalTransactionService transactionService) { + this.transactionService = transactionService; + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |