From: <mar...@us...> - 2010-08-02 09:43:00
|
Revision: 3386 http://bigdata.svn.sourceforge.net/bigdata/?rev=3386&view=rev Author: martyncutcher Date: 2010-08-02 09:42:54 +0000 (Mon, 02 Aug 2010) Log Message: ----------- Add support for management of deferred frees and testing for address allocation Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -116,7 +116,8 @@ RWStore.clrBit(m_bits, bit); if (!RWStore.tstBit(m_commit, bit)) { - m_writeCache.clearWrite(addr); + // Should not be cleared here! + // m_writeCache.clearWrite(addr); RWStore.clrBit(m_transients, bit); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -39,6 +39,7 @@ public int getDiskAddr(); public void setDiskAddr(int addr); public long getPhysicalAddress(int offset); + public boolean isAllocated(int offset); public int getPhysicalSize(int offset); public byte[] write(); public void read(DataInputStream str); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -265,4 +265,8 @@ str.append("Index: " + m_index + ", address: " + getStartAddr() + ", BLOB\n"); } + public boolean isAllocated(int offset) { + return m_hdrs[offset] != 0; + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -372,7 +372,7 @@ } } } - + return false; } @@ -501,4 +501,15 @@ return alloted * m_size; } + + public boolean isAllocated(int offset) { + offset -= 3; + + int allocBlockRange = 32 * m_bitSize; + + AllocBlock block = (AllocBlock) m_allocBlocks.get(offset / allocBlockRange); + int bit = offset % allocBlockRange; + + return RWStore.tstBit(block.m_bits, bit); + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-08-02 09:40:26 UTC (rev 3385) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-08-02 09:42:54 UTC (rev 3386) @@ -38,11 +38,14 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.naming.OperationNotSupportedException; + import org.apache.log4j.Logger; import com.bigdata.io.FileChannelUtility; @@ -50,6 +53,8 @@ import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.ForceEnum; import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.ITransactionService; +import com.bigdata.journal.JournalTransactionService; import com.bigdata.journal.Options; import com.bigdata.journal.RWStrategy.FileMetadataView; import com.bigdata.quorum.Quorum; @@ -159,6 +164,46 @@ * to determine chained blocks for freeing. This is particularly important * for larger stores where a disk cache could be flushed through simply freeing * BLOB allocations. + * + * Deferred Free List + * + * In order to provide support for read-only transactions across store + * mutations, storage that is free'd in the mutation cannot be free'd immediately. + * + * The deferredFreeList is effectively a table of txReleaseTimes and + * and addresses to data with lists of addresses to be freed. + * + * OTOH, if in general we only ever free store "in the next transaction" then + * the big advantage of storage reallocation in bulk upload is lost. The logic + * should hold that if we can be sure that no read-only transaction can be + * active then we can free immediately, otherwise we should defer call to + * complete the free until the commit point of any current transaction has + * been passed. + * + * The logic of determining whether to call free or deferFree is left to the + * client and whatever transaction management policy they follow. The support + * assmumes that a safety-first policy of always calling deferFree should not + * cause performance or resource problems. + * + * Since there is a cost of synchronizing with the transaction service to check + * the transaction state, the check is only made either prior to commit or + * after every so many deferFree requests. + * + * The management of the deferredFreeList is via a management block referenced + * from the metaAllocation data. This contains a reference to a block of + * addresses that each reference a block of deferred frees. + * + * At runtime, this list includes the txnReleaseTime when the allocation was + * "freed". + * + * The release times stored are the last commit time at the time of the + * original free request. The current transaction environment covers a window + * of first release time and the current commit time. When the stored release + * time is before the current first release time then that allocation can + * be safely freed. + * + * When a store is opened this window is not needed and any deferred frees can + * be immediately freed since no transactions will be active. */ public class RWStore implements IStore { @@ -275,9 +320,44 @@ * significant contention may be avoided. */ final private ReentrantLock m_allocationLock = new ReentrantLock(); + + /** + * This lock controls access to the deferredFree structures used + * in deferFree. + * + * The deferral of freeing storage supports processing of read-only + * transactions concurrent with modifying/mutation tasks + */ + final private ReentrantLock m_deferFreeLock = new ReentrantLock(); - private ReopenFileChannel m_reopener = null; + /** + * The deferredFreeList is simply an array of releaseTime,freeListAddrs + * stored at commit. + * + * Note that when the deferredFreeList is saved, ONLY thefreeListAddrs + * are stored, NOT the releaseTime. This is because on any open of + * the store, all deferredFrees can be released immediately. This + * mechanism may be changed in the future to enable explicit history + * retention, but if so a different header structure would be used since + * it would not be appropriate to retain a simple header linked to + * thousands if not milions of commit points. + * + * If the current txn list exceeds the MAX_DEFERRED_FREE then it is + * incrementally saved and a new list begun. The master list itself + * serves as a BLOB header when there is more than a single entry with + * the same txReleaseTime. + */ + private static final int MAX_DEFERRED_FREE = 4094; // fits in 16k block + final ArrayList<Long> m_deferredFreeList = new ArrayList<Long>(); + volatile int m_deferredFreeListAddr = 0; + volatile int m_deferredFreeListEntries = 0; + volatile long m_lastTxReleaseTime = 0; + final ArrayList<Integer> m_currentTxnFreeList = new ArrayList<Integer>(); + + volatile long deferredFreeCount = 0; + private ReopenFileChannel m_reopener = null; + class WriteCacheImpl extends WriteCache.FileChannelScatteredWriteCache { public WriteCacheImpl(final ByteBuffer buf, final boolean useChecksum, @@ -304,6 +384,12 @@ } } + + // Added to enable debug of rare problem + // FIXME: disable by removal once solved + protected void registerWriteStatus(long offset, int length, char action) { + m_writeCache.debugAddrs(offset, length, action); + } }; @@ -359,7 +445,8 @@ } int buffers = m_fmv.getFileMetadata().writeCacheBufferCount; - // buffers = 10; + log.warn("RWStore using writeCacheService with buffers: " + buffers); + try { m_writeCache = new RWWriteCacheService( buffers, m_raf @@ -546,12 +633,16 @@ FileChannelUtility.readAll(m_reopener, ByteBuffer.wrap(buf), rawmbaddr); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + + final int deferredFreeListAddr = strBuf.readInt(); + final int deferredFreeListEntries = strBuf.readInt(); + int allocBlocks = strBuf.readInt(); m_allocSizes = new int[allocBlocks]; for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 1; + m_metaBitsSize = metaBitsStore - allocBlocks - 3; // allow for deferred free m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -573,6 +664,8 @@ readAllocationBlocks(); + clearOutstandingDeferrels(deferredFreeListAddr, deferredFreeListEntries); + if (log.isTraceEnabled()) { final StringBuffer str = new StringBuffer(); this.showAllocators(str); @@ -590,6 +683,55 @@ + ", " + m_metaBitsAddr); } + /* + * Called when store is opened to make sure any deferred frees are + * cleared. + * + * Stored persistently is only the list of addresses of blocks to be freed, + * the knowledge of the txn release time does not need to be held persistently, + * this is only relevant for transient state while the RWStore is open. + * + * The deferredCount is the number of entries - integer address and integer + * count at each address + */ + private void clearOutstandingDeferrels(final int deferredAddr, final int deferredCount) { + if (deferredAddr != 0) { + assert deferredCount != 0; + final int sze = deferredCount * 8 + 4; // include spce fo checksum + byte[] buf = new byte[sze]; + getData(deferredAddr, buf); + + final byte[] blockBuf = new byte[8 * 1024]; // maximum size required + + ByteBuffer in = ByteBuffer.wrap(buf); + for (int i = 0; i < deferredCount; i++) { + int blockAddr = in.getInt(); + int addrCount = in.getInt(); + + // now read in this block and free all addresses referenced + getData(blockAddr, blockBuf, 0, addrCount*4 + 4); + ByteBuffer inblock = ByteBuffer.wrap(blockBuf); + for (int b = 0; b < addrCount; b++) { + final int defAddr = inblock.getInt(); + Allocator alloc = getBlock(defAddr); + if (alloc instanceof BlobAllocator) { + b++; + assert b < addrCount; + alloc.free(defAddr, inblock.getInt()); + } else { + alloc.free(defAddr, 0); // size ignored for FreeAllocators + } + } + // once read then free the block allocation + free(blockAddr, 0); + } + + // lastly free the deferredAddr + free(deferredAddr, 0); + } + + } + /********************************************************************* * make sure resource is closed! **/ @@ -827,10 +969,13 @@ int chk = ChecksumUtility.getCHK().checksum(buf, offset, length-4); // read checksum int tstchk = bb.getInt(offset + length-4); if (chk != tstchk) { + String cacheDebugInfo = m_writeCache.addrDebugInfo(paddr); log.warn("Invalid data checksum for addr: " + paddr + ", chk: " + chk + ", tstchk: " + tstchk + ", length: " + length + ", first byte: " + buf[0] + ", successful reads: " + m_diskReads - + ", at last extend: " + m_readsAtExtend + ", cacheReads: " + m_cacheReads); + + ", at last extend: " + m_readsAtExtend + ", cacheReads: " + m_cacheReads + + ", writeCacheDebug: " + cacheDebugInfo); + throw new IllegalStateException("Invalid data checksum"); } @@ -908,11 +1053,17 @@ m_allocationLock.lock(); try { final Allocator alloc = getBlockByAddress(addr); - final long pa = alloc.getPhysicalAddress(getOffset(addr)); + final int addrOffset = getOffset(addr); + final long pa = alloc.getPhysicalAddress(addrOffset); alloc.free(addr, sze); // must clear after free in case is a blobHdr that requires reading! + // the allocation lock protects against a concurrent re-allocation + // of the address before the cache has been cleared + assert pa != 0; m_writeCache.clearWrite(pa); m_frees++; + if (alloc.isAllocated(addrOffset)) + throw new IllegalStateException("Reallocation problem with WriteCache"); if (!m_commitList.contains(alloc)) { m_commitList.add(alloc); @@ -920,6 +1071,7 @@ } finally { m_allocationLock.unlock(); } + } /** @@ -991,13 +1143,13 @@ addr = allocator.alloc(this, size); - if (log.isDebugEnabled()) - log.debug("New FixedAllocator for " + cmp + " byte allocations at " + addr); + if (log.isTraceEnabled()) + log.trace("New FixedAllocator for " + cmp + " byte allocations at " + addr); m_allocs.add(allocator); } else { // Verify free list only has allocators with free bits - { + if (log.isDebugEnabled()){ int tsti = 0; Iterator<Allocator> allocs = list.iterator(); while (allocs.hasNext()) { @@ -1209,10 +1361,20 @@ * @throws IOException */ private void writeMetaBits() throws IOException { - final int len = 4 * (1 + m_allocSizes.length + m_metaBits.length); + // the metabits is now prefixed by two ints representing the + // deferred frees - the address and number of defer block addresses + // to be found there + final int len = 4 * (2 + 1 + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); + + saveDeferrals(); + + str.writeInt(m_deferredFreeListAddr); + str.writeInt(m_deferredFreeListEntries); + + str.writeInt(m_allocSizes.length); for (int i = 0; i < m_allocSizes.length; i++) { str.writeInt(m_allocSizes[i]); @@ -1317,7 +1479,36 @@ // if (m_commitCallback != null) { // m_commitCallback.commitCallback(); // } + + checkDeferredFrees(); + // save deferredFree data + if (m_deferredFreeList.size() > 0) { + final int oldDeferredFreeListAddr = m_deferredFreeListAddr; + // list contains txReleaseTime,addr of addrs to be freed + final int addrs = m_deferredFreeList.size() / 2; + final int addrSize = addrs*8; + byte[] deferBuf = new byte[addrSize]; // each addr is a long + ByteBuffer bb = ByteBuffer.wrap(deferBuf); + for (int i = 0; i < addrs; i++) { + bb.putLong(m_deferredFreeList.get(1 + (i*2))); + } + bb.flip(); + m_deferredFreeListAddr = (int) alloc(deferBuf, addrSize); + m_deferredFreeListEntries = addrs; + final int chk = ChecksumUtility.getCHK().checksum(deferBuf); + try { + m_writeCache.write(physicalAddress(m_deferredFreeListAddr), bb, chk); + } catch (IllegalStateException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + free(oldDeferredFreeListAddr, 0); + } else { + m_deferredFreeListAddr = 0; + m_deferredFreeListEntries = 0; + } // Allocate storage for metaBits long oldMetaBits = m_metaBitsAddr; int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; @@ -1387,6 +1578,28 @@ } /** + * Called prior to commit, so check whether storage can be freed and then + * whether the deferredheader needs to be saved. + */ + private void checkDeferredFrees() { + checkFreeable(); + + this.m_deferredFreeListEntries = m_deferredFreeList.size(); + if (m_deferredFreeListEntries > 0) { + int bufSize = 4 + m_deferredFreeListEntries*8; + byte[] buf = new byte[bufSize]; + ByteBuffer bb = ByteBuffer.wrap(buf); + bb.putInt(m_deferredFreeListEntries); + for (int i = 0; i < m_deferredFreeListEntries; i++) { + bb.putLong(m_deferredFreeList.get(i)); + } + m_deferredFreeListAddr = (int) alloc(buf, bufSize); + } else { + this.m_deferredFreeListAddr = 0; + } + } + + /** * * @return conservative requirement for metabits storage, mindful that the * request to allocate the metabits may require an increase in the @@ -1402,6 +1615,8 @@ int allocBlocks = (8 + commitInts)/8; ints += 9 * allocBlocks; + ints += 2; // for deferredFreeListAddr and size + return ints*4; // return as bytes } @@ -2074,8 +2289,8 @@ long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes - int metaBitsSize = m_metaBits.length + m_allocSizes.length + 1; + // include space for allocSizes and deferred free info + int metaBitsSize = 2 + m_metaBits.length + m_allocSizes.length + 1; ret += metaBitsSize; if (log.isInfoEnabled()) @@ -2317,4 +2532,183 @@ return allocated; } + /* + * Adds the address for later freeing to the deferred free list. + * + * This is maintained in a set of chained 4K buffers, with the head and + * tail referenced from the meta allocation block. + * + * If the allocation is for a BLOB then the sze is also stored + * + * The deferred list is checked on AllocBlock and prior to commit. + * + * There is also a possibility to check for deferral at this point, since + * we are passed both the currentCommitTime - against which this free + * will be deferred and the earliest tx start time against which we + * can check to see if + */ + public void deferFree(int rwaddr, int sze, long currentCommitTime) { + + m_deferFreeLock.lock(); + try { + deferredFreeCount++; + if (currentCommitTime != m_lastTxReleaseTime) { + assert m_lastTxReleaseTime == 0; + + m_lastTxReleaseTime = currentCommitTime; + } + m_currentTxnFreeList.add(rwaddr); + + final Allocator alloc = getBlockByAddress(rwaddr); + if (alloc instanceof BlobAllocator) { + m_currentTxnFreeList.add(sze); + } + if (m_currentTxnFreeList.size() >= MAX_DEFERRED_FREE) { + // Save current list and clear + saveDeferrals(); + + assert m_currentTxnFreeList.size() == 0; + } + + // every so many deferrals, check for free + if (deferredFreeCount % 100 == 0) { + checkFreeable(); + } + } finally { + m_deferFreeLock.unlock(); + } + } + + private void checkFreeable() { + if (transactionService == null) { + return; + } + + try { + transactionService.callWithLock(new Callable<Object>() { + + public Object call() throws Exception { + if (transactionService.getActiveCount() == 0) { + freeAllDeferrals(Long.MAX_VALUE); + } else { + freeAllDeferrals(transactionService.getEarliestTxStartTime()); + } + return null; + } + + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * Frees all storage deferred against a txn release time less than that + * passed in. + * + * @param txnRelease - the max release time + */ + protected void freeAllDeferrals(long txnRelease) { + final Iterator<Long> defers = m_deferredFreeList.iterator(); + while (defers.hasNext() && defers.next() < txnRelease) { + if (txnRelease != Long.MAX_VALUE) { + System.out.println("Freeing deferrals"); // FIXME remove debug + } + defers.remove(); + freeDeferrals(defers.next()); + defers.remove(); + } + if (m_lastTxReleaseTime < txnRelease) { + final Iterator<Integer> curdefers = m_currentTxnFreeList.iterator(); + while (curdefers.hasNext()) { + final int rwaddr = curdefers.next(); + Allocator alloc = getBlock(rwaddr); + if (alloc instanceof BlobAllocator) { + assert curdefers.hasNext(); + + free(rwaddr, curdefers.next()); + } else { + free(rwaddr, 0); // size ignored for FreeAllocators + } + } + m_currentTxnFreeList.clear(); + } + } + + /** + * Writes the content of currentTxnFreeList to the store. + * + * These are the current buffered frees that have yet been saved into + * a block referenced from the deferredFreeList + * + * @return the address of the deferred addresses saved on the store + */ + private void saveDeferrals() { + int addrCount = m_currentTxnFreeList.size(); + + if (addrCount == 0) { + return; + } + + byte[] buf = new byte[4 * (addrCount+1)]; + ByteBuffer out = ByteBuffer.wrap(buf); + out.putInt(addrCount); + for (int i = 0; i < addrCount; i++) { + out.putInt(m_currentTxnFreeList.get(i)); + } + + // now we've saved it to the store, clear the list + m_currentTxnFreeList.clear(); + + long rwaddr = alloc(buf, buf.length); + + rwaddr <<= 32; + rwaddr += addrCount; + + // Now add the reference of this block + m_deferredFreeList.add(m_lastTxReleaseTime); + m_deferredFreeList.add(rwaddr); + + + + } + + /** + * Provided with the address of a block of addresses to be freed + * @param blockAddr + */ + protected void freeDeferrals(long blockAddr) { + int addr = (int) -(blockAddr >> 32); + int sze = (int) blockAddr & 0xFFFFFF; + + byte[] buf = new byte[sze+4]; // allow for checksum + getData(addr, buf); + final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + m_allocationLock.lock(); + try { + int addrs = strBuf.readInt(); + while (addrs-- > 0) { + int nxtAddr = strBuf.readInt(); + Allocator alloc = getBlock(nxtAddr); + if (alloc instanceof BlobAllocator) { + assert addrs > 0; // a Blob address MUST have a size + --addrs; + free(nxtAddr, strBuf.readInt()); + } else { + free(nxtAddr, 0); // size ignored for FreeAllocators + } + } + } catch (IOException e) { + throw new RuntimeException("Problem freeing deferrals", e); + } finally { + m_allocationLock.unlock(); + } + } + + private JournalTransactionService transactionService = null; + public void setTransactionService(final JournalTransactionService transactionService) { + this.transactionService = transactionService; + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-09-08 14:46:46
|
Revision: 3517 http://bigdata.svn.sourceforge.net/bigdata/?rev=3517&view=rev Author: martyncutcher Date: 2010-09-08 14:46:40 +0000 (Wed, 08 Sep 2010) Log Message: ----------- add allocation stats output for debug Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-09-08 13:27:18 UTC (rev 3516) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-09-08 14:46:40 UTC (rev 3517) @@ -193,7 +193,7 @@ final int total = m_ints * 32; final int allocBits = getAllocBits(); - return "Addr : " + m_addr + " [" + allocBits + "::" + total + "]"; + return " - start addr : " + RWStore.convertAddr(m_addr) + " [" + allocBits + "::" + total + "]"; } public void addAddresses(final ArrayList addrs, final int rootAddr) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-08 13:27:18 UTC (rev 3516) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-08 14:46:40 UTC (rev 3517) @@ -490,7 +490,18 @@ } public void appendShortStats(StringBuffer str) { - str.append("Index: " + m_index + ", address: " + getStartAddr() + ", " + m_size + "\n"); + str.append("Index: " + m_index + ", " + m_size); + + Iterator<AllocBlock> blocks = m_allocBlocks.iterator(); + while (blocks.hasNext()) { + AllocBlock block = blocks.next(); + if (block.m_addr != 0) { + str.append(block.getStats()); + } else { + break; + } + } + str.append("\n"); } public int getAllocatedBlocks() { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-09-08 13:27:18 UTC (rev 3516) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-09-08 14:46:40 UTC (rev 3517) @@ -713,6 +713,9 @@ protected void readAllocationBlocks() throws IOException { assert m_allocs.size() == 0; + + System.out.println("readAllocationBlocks, m_metaBits.length: " + + m_metaBits.length); /** * Allocators are sorted in StartAddress order (which MUST be the order @@ -721,7 +724,8 @@ * the metaAllocation if two allocation blocks were loaded for the same * address (must be two version of same Allocator). * - * Meta-Allocations stored as {int address; int[8] bits} + * Meta-Allocations stored as {int address; int[8] bits}, so each block + * holds 8*32=256 allocation slots of 1K totalling 256K. */ for (int b = 0; b < m_metaBits.length; b += 9) { long blockStart = convertAddr(m_metaBits[b]); @@ -772,6 +776,13 @@ for (int index = 0; index < m_allocs.size(); index++) { ((Allocator) m_allocs.get(index)).setIndex(index); } + + if (false) { + StringBuffer tmp = new StringBuffer(); + showAllocators(tmp); + + System.out.println("Allocators: " + tmp.toString()); + } } /** @@ -1997,9 +2008,10 @@ static boolean tstBit(int[] bits, int bitnum) { int index = bitnum / 32; int bit = bitnum % 32; - + if (index >= bits.length) - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Accessing bit index: " + index + + " of array length: " + bits.length); return (bits[(int) index] & 1 << bit) != 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-08 11:09:15
|
Revision: 3909 http://bigdata.svn.sourceforge.net/bigdata/?rev=3909&view=rev Author: martyncutcher Date: 2010-11-08 11:09:09 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Handle BlobHeaders larger than maximum fixed allocation - where the header must itself be a BLOB. This is possible with lower allocation settings, eg with 1K max fixed allocators, a 500K BLOB would require a 2K header to hold the 500 fixed allocation references. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -414,7 +414,7 @@ if (m_freeBits++ == 0 && false) { m_freeWaiting = false; m_freeList.add(this); - } else if (m_freeWaiting && m_freeBits == 3000) { + } else if (m_freeWaiting && m_freeBits == m_store.cDefaultFreeBitsThreshold) { m_freeWaiting = false; m_freeList.add(this); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -145,6 +145,8 @@ private PSOutputStream m_next = null; private int m_blobHdrIdx; + + private boolean m_writingHdr = false; private PSOutputStream next() { return m_next; @@ -162,8 +164,12 @@ m_context = context; m_blobThreshold = maxAlloc-4; // allow for checksum - if (m_buf == null || m_buf.length != m_blobThreshold) - m_buf = new byte[m_blobThreshold]; + + final int maxHdrSize = RWStore.BLOB_FIXED_ALLOCS * 4; + final int bufSize = m_blobThreshold > maxHdrSize ? m_blobThreshold : maxHdrSize; + + if (m_buf == null || m_buf.length != bufSize) + m_buf = new byte[bufSize]; reset(); } @@ -199,9 +205,9 @@ throw new IllegalStateException("Writing to saved PSOutputStream"); } - if (m_count == m_blobThreshold) { + if (m_count == m_blobThreshold && !m_writingHdr) { if (m_blobHeader == null) { - m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; + m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; // max 16K m_blobHdrIdx = 0; } @@ -305,27 +311,32 @@ int addr = (int) m_store.alloc(m_buf, m_count, m_context); if (m_blobHeader != null) { - m_blobHeader[m_blobHdrIdx++] = addr; - int precount = m_count; - m_count = 0; - try { - writeInt(m_blobHdrIdx); - for (int i = 0; i < m_blobHdrIdx; i++) { - writeInt(m_blobHeader[i]); - } - addr = (int) m_store.alloc(m_buf, m_count, m_context); - - if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { - throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); - } - - if (log.isDebugEnabled()) - log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); - - addr = m_store.registerBlob(addr); // returns handle - } catch (IOException e) { - e.printStackTrace(); - } + try { + m_writingHdr = true; // ensure that header CAN be a BLOB + m_blobHeader[m_blobHdrIdx++] = addr; + int precount = m_count; + m_count = 0; + try { + writeInt(m_blobHdrIdx); + for (int i = 0; i < m_blobHdrIdx; i++) { + writeInt(m_blobHeader[i]); + } + addr = (int) m_store.alloc(m_buf, m_count, m_context); + + if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { + throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); + } + + if (log.isDebugEnabled()) + log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); + + addr = m_store.registerBlob(addr); // returns handle + } catch (IOException e) { + e.printStackTrace(); + } + } finally { + m_writingHdr = false; + } } m_isSaved = true; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -245,6 +245,18 @@ String DEFAULT_META_BITS_SIZE = "9"; + /** + * Defines the number of bits that must be free in a FixedAllocator for + * it to be added to the free list. This is used to ensure a level + * of locality when making large numbers of allocations within a single + * commit. + * <p> + * The value should be >= 1 and <= 5000 + */ + String FREE_BITS_THRESHOLD = RWStore.class.getName() + ".freeBitsThreshold"; + + String DEFAULT_FREE_BITS_THRESHOLD = "300"; + } /* @@ -341,6 +353,12 @@ final int m_minFixedAlloc; /** + * Currently we do not support a Blob header to be a Blob, so the + * maximum possible Blob is ((maxFixed-4) * maxFixed) - 4. + */ + final int m_maxBlobAllocSize; + + /** * This lock is used to exclude readers when the extent of the backing file * is about to be changed. * <p> @@ -477,7 +495,16 @@ m_metaBitsSize = cDefaultMetaBitsSize; - m_metaBits = new int[m_metaBitsSize]; + cDefaultFreeBitsThreshold = Integer.valueOf(fileMetadata.getProperty( + Options.FREE_BITS_THRESHOLD, + Options.DEFAULT_FREE_BITS_THRESHOLD)); + + if (cDefaultFreeBitsThreshold < 1 || cDefaultFreeBitsThreshold > 5000) { + throw new IllegalArgumentException(Options.FREE_BITS_THRESHOLD + + " : Must be between 1 and 5000"); + } + + m_metaBits = new int[m_metaBitsSize]; m_metaTransientBits = new int[m_metaBitsSize]; @@ -556,6 +583,12 @@ m_minFixedAlloc = m_allocSizes[0]*64; } + final int maxBlockLessChk = m_maxFixedAlloc-4; + // set this at blob header references max 4096 fixed allocs + // meaning that header may itself be a blob if max fixed is + // less than 16K + m_maxBlobAllocSize = (BLOB_FIXED_ALLOCS * maxBlockLessChk); + assert m_maxFixedAlloc > 0; m_deferredFreeOut = PSOutputStream.getNew(this, m_maxFixedAlloc, null); @@ -668,7 +701,7 @@ + metaBitsAddr + ", m_commitCounter: " + commitCounter); } - + /** * Should be called where previously initFileSpec was used. * @@ -728,13 +761,14 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_lastDeferredReleaseTime = strBuf.readLong(); + cDefaultMetaBitsSize = strBuf.readInt(); final int allocBlocks = strBuf.readInt(); m_allocSizes = new int[allocBlocks]; for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 3; // allow for deferred free + m_metaBitsSize = metaBitsStore - allocBlocks - 4; // allow for deferred free m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -854,10 +888,10 @@ * Meta-Allocations stored as {int address; int[8] bits}, so each block * holds 8*32=256 allocation slots of 1K totaling 256K. */ - for (int b = 0; b < m_metaBits.length; b += 9) { + for (int b = 0; b < m_metaBits.length; b += cDefaultMetaBitsSize) { final long blockStart = convertAddr(m_metaBits[b]); final int startBit = (b * 32) + 32; - final int endBit = startBit + (8*32); + final int endBit = startBit + ((cDefaultMetaBitsSize-1)*32); for (int i = startBit; i < endBit; i++) { if (tstBit(m_metaBits, i)) { final long addr = blockStart + ((i-startBit) * ALLOC_BLOCK_SIZE); @@ -1061,8 +1095,19 @@ + m_maxFixedAlloc); final byte[] hdrbuf = new byte[4 * (nblocks + 1) + 4]; // plus 4 bytes for checksum - final BlobAllocator ba = (BlobAllocator) getBlock((int) addr); - getData(ba.getBlobHdrAddress(getOffset((int) addr)), hdrbuf); // read in header + if (hdrbuf.length > m_maxFixedAlloc) { + if (log.isInfoEnabled()) { + log.info("LARGE BLOB - header is BLOB"); + } + } + + final Allocator na = getBlock((int) addr); + if (! (na instanceof BlobAllocator)) { + throw new IllegalStateException("Invalid Allocator index"); + } + final BlobAllocator ba = (BlobAllocator) na; + final int hdraddr = ba.getBlobHdrAddress(getOffset((int) addr)); + getData(hdraddr, hdrbuf); // read in header - could itself be a blob! final DataInputStream hdrstr = new DataInputStream(new ByteArrayInputStream(hdrbuf)); final int rhdrs = hdrstr.readInt(); if (rhdrs != nblocks) { @@ -1703,13 +1748,16 @@ // used to free the deferedFree allocations. This is used to determine // which commitRecord to access to process the nextbatch of deferred // frees. - final int len = 4 * (2 + 1 + m_allocSizes.length + m_metaBits.length); + // the cDefaultMetaBitsSize is also written since this can now be + // parameterized. + final int len = 4 * (2 + 1 + 1 + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); try { str.writeLong(m_lastDeferredReleaseTime); - + str.writeInt(cDefaultMetaBitsSize); + str.writeInt(m_allocSizes.length); for (int i = 0; i < m_allocSizes.length; i++) { str.writeInt(m_allocSizes[i]); @@ -1914,11 +1962,18 @@ /** * @see Options#META_BITS_SIZE */ - private final int cDefaultMetaBitsSize; + private int cDefaultMetaBitsSize; /** * @see Options#META_BITS_SIZE */ volatile private int m_metaBitsSize; + /** + * Package private since is uded by FixedAllocators + * + * @see Options#META_BITS_SIZE + */ + final int cDefaultFreeBitsThreshold; + private int m_metaBits[]; private int m_metaTransientBits[]; // volatile private int m_metaStartAddr; @@ -2075,7 +2130,7 @@ // final int bitsPerBlock = 9 * 32; final int intIndex = bit / 32; // divide 32; - final int addrIndex = (intIndex/9)*9; + final int addrIndex = (intIndex/cDefaultMetaBitsSize)*cDefaultMetaBitsSize; final long addr = convertAddr(m_metaBits[addrIndex]); final int intOffset = bit - ((addrIndex+1) * 32); @@ -2620,8 +2675,8 @@ long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes and deferred free info - final int metaBitsSize = 2 + m_metaBits.length + m_allocSizes.length + 1; + // include space for allocSizes and deferred free info AND cDefaultMetaBitsSize + final int metaBitsSize = 2 + 1 + m_metaBits.length + m_allocSizes.length + 1; ret += metaBitsSize; if (log.isTraceEnabled()) @@ -3225,8 +3280,8 @@ } -// log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" -// + context); + log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" + + context); } @@ -3893,4 +3948,8 @@ } + public int getMaxBlobSize() { + return this.m_maxBlobAllocSize-4; + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-09 11:48:28
|
Revision: 3920 http://bigdata.svn.sourceforge.net/bigdata/?rev=3920&view=rev Author: martyncutcher Date: 2010-11-09 11:48:21 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Add version info to MetaBits header, correctly recycle PSOutputStreams, and fix erroneous maxAlloc assertions Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -56,13 +56,13 @@ } public int alloc(RWStore store, int size, IAllocationContext context) { - assert size > m_store.m_maxFixedAlloc; + assert size > (m_store.m_maxFixedAlloc-4); return 0; } public boolean free(int addr, int sze) { - if (sze < m_store.m_maxFixedAlloc) + if (sze < (m_store.m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); int alloc = m_store.m_maxFixedAlloc-4; int blcks = (alloc - 1 + sze)/alloc; @@ -103,8 +103,8 @@ } public int getFirstFixedForBlob(int addr, int sze) { - if (sze < m_store.m_maxFixedAlloc) - throw new IllegalArgumentException("Unexpected address size"); + if (sze < (m_store.m_maxFixedAlloc-4)) + throw new IllegalArgumentException("Unexpected address size: " + sze); int alloc = m_store.m_maxFixedAlloc-4; int blcks = (alloc - 1 + sze)/alloc; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -79,27 +79,24 @@ private static PSOutputStream m_poolHead = null; private static PSOutputStream m_poolTail = null; - private static Integer m_lock = new Integer(42); private static int m_streamCount = 0; - public static PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { - synchronized (m_lock) { - PSOutputStream ret = m_poolHead; - if (ret != null) { - m_streamCount--; - - m_poolHead = ret.next(); - if (m_poolHead == null) { - m_poolTail = null; - } - } else { - ret = new PSOutputStream(); - } + public static synchronized PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { + PSOutputStream ret = m_poolHead; + if (ret != null) { + m_streamCount--; - ret.init(store, maxAlloc, context); - - return ret; + m_poolHead = ret.next(); + if (m_poolHead == null) { + m_poolTail = null; + } + } else { + ret = new PSOutputStream(); } + + ret.init(store, maxAlloc, context); + + return ret; } /******************************************************************* @@ -110,23 +107,21 @@ * maximum of 10 streams are maintained - adding up to 80K to the * garbage collect copy. **/ - static void returnStream(PSOutputStream stream) { - synchronized (m_lock) { - if (m_streamCount > 10) { - return; - } - - stream.m_count = 0; // avoid overflow - - if (m_poolTail != null) { - m_poolTail.setNext(stream); - } else { - m_poolHead = stream; - } - - m_poolTail = stream; - m_streamCount++; + static synchronized void returnStream(PSOutputStream stream) { + if (m_streamCount > 10) { + return; } + + stream.m_count = 0; // avoid overflow + + if (m_poolTail != null) { + m_poolTail.setNext(stream); + } else { + m_poolHead = stream; + } + + m_poolTail = stream; + m_streamCount++; } private int[] m_blobHeader = null; @@ -161,6 +156,7 @@ void init(IStore store, int maxAlloc, IAllocationContext context) { m_store = store; m_context = context; + m_next = null; m_blobThreshold = maxAlloc-4; // allow for checksum @@ -357,11 +353,6 @@ return m_bytesWritten; } - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - public OutputStream getFilterWrapper(final boolean saveBeforeClose) { return new FilterOutputStream(this) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -283,7 +283,7 @@ static final int ALLOCATION_SCALEUP = 16; // multiplier to convert allocations based on minimum allocation of 32k static private final int META_ALLOCATION = 8; // 8 * 32K is size of meta Allocation - static final int BLOB_FIXED_ALLOCS = 1024; + static final int BLOB_FIXED_ALLOCS = 2048; // private ICommitCallback m_commitCallback; // // public void setCommitCallback(final ICommitCallback callback) { @@ -760,6 +760,10 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + final int storeVersion = strBuf.readInt(); + if (storeVersion != cVersion) { + throw new IllegalStateException("Incompatible RWStore header version"); + } m_lastDeferredReleaseTime = strBuf.readLong(); cDefaultMetaBitsSize = strBuf.readInt(); @@ -768,7 +772,7 @@ for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 4; // allow for deferred free + m_metaBitsSize = metaBitsStore - allocBlocks - cMetaHdrFields; // allow for header fields m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -1594,6 +1598,14 @@ throw new RuntimeException("Closed Store?", e); + } finally { + try { + psout.close(); // return stream + } catch (IOException ioe) { + // should not happen, since this should only be + // recycling + log.warn("Unexpected error closing PSOutputStream", ioe); + } } } @@ -1750,11 +1762,12 @@ // frees. // the cDefaultMetaBitsSize is also written since this can now be // parameterized. - final int len = 4 * (2 + 1 + 1 + m_allocSizes.length + m_metaBits.length); + final int len = 4 * (cMetaHdrFields + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); try { + str.writeInt(cVersion); str.writeLong(m_lastDeferredReleaseTime); str.writeInt(cDefaultMetaBitsSize); @@ -1941,6 +1954,7 @@ ints += 9 * allocBlocks; ints += 2; // for deferredFreeListAddr and size + ints += 1; // for version return ints*4; // return as bytes } @@ -1960,6 +1974,17 @@ */ /** + * MetaBits HEADER version must be changed when the header or allocator + * serialization changes + * + * Use BCD-style numbering so + * 0x0200 == 2.00 + * 0x0320 == 3.20 + */ + final private int cVersion = 0x0200; + + final private int cMetaHdrFields = 5; // version, deferredFree(long), + /** * @see Options#META_BITS_SIZE */ private int cDefaultMetaBitsSize; @@ -2393,11 +2418,12 @@ str.append("RWStore Allocation Summary\n"); str.append("-------------------------\n"); str.append(padRight("Allocator", 10)); - str.append(padLeft("Slots used", 12)); - str.append(padLeft("available", 12)); - str.append(padLeft("Store used", 14)); - str.append(padLeft("available", 14)); + str.append(padLeft("SlotsUsed", 12)); + str.append(padLeft("reserved", 12)); + str.append(padLeft("StoreUsed", 14)); + str.append(padLeft("reserved", 14)); str.append(padLeft("Usage", 8)); + str.append(padLeft("Store", 8)); str.append("\n"); long treserved = 0; long treservedSlots = 0; @@ -2410,11 +2436,16 @@ final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; tfilled += filled; tfilledSlots += stats[i].m_filledSlots; + } + for (int i = 0; i < stats.length; i++) { + final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; + final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; str.append(padRight("" + stats[i].m_blockSize, 10)); str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); + str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); str.append("\n"); } str.append(padRight("Totals", 10)); @@ -2704,16 +2735,15 @@ } /** - * Note that the representation of the - * + * The * @return long representation of metaBitsAddr PLUS the size */ public long getMetaBitsAddr() { long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes and deferred free info AND cDefaultMetaBitsSize - final int metaBitsSize = 2 + 1 + m_metaBits.length + m_allocSizes.length + 1; + // include space for version, allocSizes and deferred free info AND cDefaultMetaBitsSize + final int metaBitsSize = cMetaHdrFields + m_metaBits.length + m_allocSizes.length; ret += metaBitsSize; if (log.isTraceEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-11 09:48:12
|
Revision: 3928 http://bigdata.svn.sourceforge.net/bigdata/?rev=3928&view=rev Author: martyncutcher Date: 2010-11-11 09:48:06 +0000 (Thu, 11 Nov 2010) Log Message: ----------- add BLOB allocator stats, rationalise metaBitsSize calculation and ensure allocationLock held across BLOB registration Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-10 21:11:53 UTC (rev 3927) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-11 09:48:06 UTC (rev 3928) @@ -40,7 +40,10 @@ private int m_sortAddr; private ArrayList m_freeList; private long m_startAddr; - // @todo javadoc. why 254? + /** + * There are 256 ints in a BlobAllocator, the first is used to provide the + * sortAddr, and the last for the checksum, leaving 254 BlobHdr addresses + */ private int m_freeSpots = 254; public BlobAllocator(final RWStore store, final int sortAddr) { @@ -60,14 +63,14 @@ return false; } - // @todo javadoc. Why is this method a NOP (other than the assert). + /** + * Should not be called directly since the PSOutputStream + * manages the blob allocations. + */ public int alloc(final RWStore store, final int size, final IAllocationContext context) { - assert size > (m_store.m_maxFixedAlloc-4); - - return 0; + throw new UnsupportedOperationException("Blob allocators do not allocate addresses directly"); } - // @todo why does this return false on all code paths? public boolean free(final int addr, final int sze) { if (sze < (m_store.m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); @@ -102,11 +105,10 @@ m_freeList.add(this); } + return true; } catch (IOException ioe) { throw new RuntimeException(ioe); } - - return false; } public int getFirstFixedForBlob(final int addr, final int sze) { @@ -292,7 +294,7 @@ } } - return 0; + throw new IllegalStateException("BlobAllocator unable to find free slot"); } public int getRawStartAddr() { @@ -308,7 +310,12 @@ } public void appendShortStats(final StringBuilder str, final AllocationStats[] stats) { - str.append("Index: " + m_index + ", address: " + getStartAddr() + ", BLOB\n"); + if (stats == null) { + str.append("Index: " + m_index + ", address: " + getStartAddr() + ", BLOB\n"); + } else { + stats[stats.length-1].m_filledSlots += 254 - m_freeSpots; + stats[stats.length-1].m_reservedSlots += 254; + } } public boolean isAllocated(final int offset) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-10 21:11:53 UTC (rev 3927) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-11 09:48:06 UTC (rev 3928) @@ -1945,17 +1945,15 @@ * number of metabits. */ private int getRequiredMetaBitsStorage() { - int ints = 1 + m_allocSizes.length; // length prefixed alloc sizes - ints += m_metaBits.length; + int ints = cMetaHdrFields; + ints += m_allocSizes.length + m_metaBits.length; - // need to handle number of modified blocks + // add the maximum number of new metaBits storage that may be + // needed to save the current committed objects final int commitInts = ((32 + m_commitList.size()) / 32); - final int allocBlocks = (8 + commitInts)/8; - ints += 9 * allocBlocks; + final int allocBlocks = (cDefaultMetaBitsSize - 1 + commitInts)/(cDefaultMetaBitsSize-1); + ints += cDefaultMetaBitsSize * allocBlocks; - ints += 2; // for deferredFreeListAddr and size - ints += 1; // for version - return ints*4; // return as bytes } @@ -1983,7 +1981,14 @@ */ final private int cVersion = 0x0200; - final private int cMetaHdrFields = 5; // version, deferredFree(long), + /** + * MetaBits Header + * int version + * long deferredFree + * int defaultMetaBitsSize + * int length of allocation sizes + */ + final private int cMetaHdrFields = 5; /** * @see Options#META_BITS_SIZE */ @@ -2403,10 +2408,13 @@ * number of filled slots | store used */ public void showAllocators(final StringBuilder str) { - final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; - for (int i = 0; i < stats.length; i++) { + final AllocationStats[] stats = new AllocationStats[m_allocSizes.length+1]; + for (int i = 0; i < stats.length-1; i++) { stats[i] = new AllocationStats(m_allocSizes[i]*64); } + // for BLOBs + stats[stats.length-1] = new AllocationStats(0); + final Iterator<Allocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { Allocator alloc = (Allocator) allocs.next(); @@ -2437,10 +2445,9 @@ tfilled += filled; tfilledSlots += stats[i].m_filledSlots; } - for (int i = 0; i < stats.length; i++) { + for (int i = 0; i < stats.length-1; i++) { final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; - str.append(padRight("" + stats[i].m_blockSize, 10)); str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); @@ -2448,7 +2455,12 @@ str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); str.append("\n"); } - str.append(padRight("Totals", 10)); + // lastly some BLOB stats - only interested in used/reserved slots + str.append(padRight("BLOB", 10)); + str.append(padLeft("" + stats[stats.length-1].m_filledSlots, 12) + padLeft("" + stats[stats.length-1].m_reservedSlots, 12)); + str.append("\n"); + + str.append(padRight("Totals", 10)); str.append(padLeft("" + tfilledSlots, 12)); str.append(padLeft("" + treservedSlots, 12)); str.append(padLeft("" + tfilled, 14)); @@ -2831,25 +2843,31 @@ * data from that into the passed byte array. */ public int registerBlob(final int addr) { - BlobAllocator ba = null; - if (m_freeBlobs.size() > 0) { - ba = (BlobAllocator) m_freeBlobs.get(0); + m_allocationLock.lock(); + try { + BlobAllocator ba = null; + if (m_freeBlobs.size() > 0) { + ba = (BlobAllocator) m_freeBlobs.get(0); + } + if (ba == null) { + final Allocator lalloc = (Allocator) m_allocs.get(m_allocs.size() - 1); + final int psa = lalloc.getRawStartAddr(); // previous block + // start address + assert (psa - 1) > m_nextAllocation; + ba = new BlobAllocator(this, psa - 1); + ba.setFreeList(m_freeBlobs); // will add itself to the free list + ba.setIndex(m_allocs.size()); + m_allocs.add(ba); + } + + if (!m_commitList.contains(ba)) { + m_commitList.add(ba); + } + + return ba.register(addr); + } finally { + m_allocationLock.unlock(); } - if (ba == null) { - final Allocator lalloc = (Allocator) m_allocs.get(m_allocs.size()-1); - final int psa = lalloc.getRawStartAddr(); // previous block start address - assert (psa-1) > m_nextAllocation; - ba = new BlobAllocator(this, psa-1); - ba.setFreeList(m_freeBlobs); // will add itself to the free list - ba.setIndex(m_allocs.size()); - m_allocs.add(ba); - } - - if (!m_commitList.contains(ba)) { - m_commitList.add(ba); - } - - return ba.register(addr); } public void addToCommit(final Allocator allocator) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-11 13:45:35
|
Revision: 3930 http://bigdata.svn.sourceforge.net/bigdata/?rev=3930&view=rev Author: thompsonbry Date: 2010-11-11 13:45:29 +0000 (Thu, 11 Nov 2010) Log Message: ----------- Removed several files which are not used in the current implementation. Removed Paths: ------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/LockFile.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSInputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/WriteBlock.java Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/LockFile.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/LockFile.java 2010-11-11 13:41:06 UTC (rev 3929) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/LockFile.java 2010-11-11 13:45:29 UTC (rev 3930) @@ -1,108 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -package com.bigdata.rwstore; - -import java.io.File; - -import org.apache.log4j.Logger; - -import com.bigdata.journal.IJournal; - -public class LockFile { - /** - * Logger. - */ - protected static final Logger log = Logger.getLogger(LockFile.class); - - protected File m_lockfd = null; - private Thread m_lockThread = null; - - public static LockFile create(String lckname) { - LockFile lf = new LockFile(lckname); - - return (lf.m_lockfd != null) ? lf : null; - } - - public LockFile(String lckname) { - try { - log.info("** LockFile request **"); - - m_lockfd = new File(lckname); - if (m_lockfd.exists()) { - log.info("** LockFile exists **"); - if (m_lockfd.lastModified() > (System.currentTimeMillis() - (40 * 1000))) { - log.warn("** CONFLICT - STILL IN USE **"); - - m_lockfd = null; - - return; - } else { - log.info("** Deleting current Lock File **"); - m_lockfd.delete(); - } - } - - File pfile = m_lockfd.getParentFile(); - if (pfile != null) { - pfile.mkdirs(); - } - - m_lockfd.createNewFile(); - - m_lockfd.deleteOnExit(); - - m_lockThread = new Thread() { - public void run() { - while (m_lockfd != null) { - m_lockfd.setLastModified(System.currentTimeMillis()); - - try { - sleep(10 * 1000); - } catch (Throwable e) { - return; - } - } - } - }; - m_lockThread.setDaemon(true); - - m_lockThread.start(); - } catch (Throwable e) { - log.error("LockFile Error", e); - - m_lockfd = null; - } - } - - public void clear() { - if (m_lockfd != null) { - File fd = m_lockfd; - - m_lockfd = null; - - fd.delete(); - } - } -} Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSInputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSInputStream.java 2010-11-11 13:41:06 UTC (rev 3929) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSInputStream.java 2010-11-11 13:45:29 UTC (rev 3930) @@ -1,298 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rwstore; - -import java.io.*; - -/************************************************************************ - * PSOutputStream - * - * Provides stream interface direct to the low-level store. - * - * Retrieved from an IObjectStore to enable output to the store. - * - * The key idea here is that rather than a call like : - * store.realloc(oldAddr, byteOutputStream)=> newAddress - * - * instead : - * store.allocStream(oldAddr)=>PSOutputStream - * - * and then : - * stream.save()=> newAddress - * - * This will enable large data formats to be streamed to the data store, - * where the previous interface would have required that the entire - * resource was loaded into a memory structure first, before being - * written out in a single block to the store. - * - * This new approach will also enable the removal of BLOB allocation - * strategy. Instead, "BLOBS" will be served by linked fixed allocation - * blocks, flushed out by the stream. - * - * A big advantage of this is that BLOB reallocation is now a lot simpler, - * since BLOB storage is simply a potentially large number of 8K blocks. - * - * This also opens up the possibility of a Stream oriented data type, that - * could be used to serve up a variety of data streams. By providing - * relevant interfaces with the client/server system, a server can then - * provide multiple streams to a high number of clients. - * - * To this end, the output stream has a fixed buffer size, and they are recycled - * from a pool of output streams. - * - *@deprecated Unused as of 7/2/2010 - **/ -public class PSInputStream extends InputStream { - - static PSInputStream m_poolHead = null; - static PSInputStream m_poolTail = null; - static Integer m_lock = new Integer(42); - static int m_streamCount = 0; - - static int s_allocStreams = 0; - static int s_returnStreams = 0; - - public static PSInputStream getNew(IStore store, int size) { - synchronized (m_lock) { - s_allocStreams++; - - PSInputStream ret = m_poolHead; - if (ret != null) { - m_streamCount--; - - m_poolHead = ret.next(); - if (m_poolHead == null) { - m_poolTail = null; - } - } else { - ret = new PSInputStream(); - } - - ret.init(store, size); - - return ret; - } - } - - /******************************************************************* - * maintains pool of streams - in a normal situation there will only - * me a single stream continually re-used, but with some patterns - * there could be many streams. For this reason it is worth checking - * that the pool is not maintained at an unnecessaily large value, so - * maximum of 10 streams are maintained - adding upto 80K to the - * garbage collect copy. - **/ - static public void returnStream(PSInputStream stream) { - synchronized (m_lock) { - s_returnStreams++; - - if (m_streamCount > 10) { - return; - } - - if (m_poolTail != null) { - m_poolTail.setNext(stream); - } else { - m_poolHead = stream; - } - - m_poolTail = stream; - m_streamCount++; - } - } - - final int cBufsize = 16 * 1024; - int m_blobThreshold = 0; - byte[] m_buf = new byte[cBufsize]; - int m_headAddr = 0; - int m_count = 0; - int m_cursor = 0; - int m_totalBytes = -1; - int m_totalRead = 0; - IStore m_store; - - private PSInputStream m_next = null; - - private PSInputStream next() { - return m_next; - } - - private void setNext(PSInputStream str) { - m_next = str; - } - - public void close() { - returnStream(this); - } - - /**************************************************************** - * resets private state variables for reuse of stream - **/ - void init(IStore store, int size) { - m_headAddr = 0; - m_count = size; - m_store = store; - m_cursor = 0; - m_blobThreshold = m_store.bufferChainOffset(); - m_totalBytes = -1; - m_totalRead = 0; - } - - - /**************************************************************** - * Returns buffer for initial read - FIX PROTOCOL LATER - **/ - public void setTotalBytes(int totalBytes) { - m_totalBytes = totalBytes; - } - - /**************************************************************** - * Returns buffer for initial read - FIX PROTOCOL LATER - **/ - public byte[] getBuffer() { - return m_buf; - } - - /************************************************************ - * util to ensure negatives don't screw things - **/ - private int makeInt(byte val) { - int ret = val; - - return ret & 0xFF; - } - - /**************************************************************** - * Reads next byte - throws EOFException if none more available - **/ - public int read() throws IOException { - - if (m_totalBytes >=0) { - if (m_totalRead >= m_totalBytes) { - return -1; - } - } - - m_totalRead++; - - if (m_cursor == m_blobThreshold) { - int nextAddr = makeInt(m_buf[m_cursor++]) << 24; - nextAddr += makeInt(m_buf[m_cursor++]) << 16; - nextAddr += makeInt(m_buf[m_cursor++]) << 8; - nextAddr += makeInt(m_buf[m_cursor]); - - m_count = m_store.getDataSize(nextAddr, m_buf); - - m_cursor = 0; - } - - if (m_cursor >= m_count) { - return -1; - } - - int ret = m_buf[m_cursor++]; - return ret & 0xFF; - } - - /**************************************************************** - * Reads next 4 byte integer value - **/ - public int readInt() throws IOException { - int value = read() << 24; - value += read() << 16; - value += read() << 8; - value += read(); - - return value; - } - - public long readLong() throws IOException { - long value = readInt(); - value <<= 32; - - value += readInt(); - - return value; - } - - public synchronized int read(byte b[], int off, int len) throws IOException { - if (len == 0) { - return 0; - } - - if (len <= available()) { - System.arraycopy(m_buf, m_cursor, b, off, len); - m_cursor += len; - m_totalRead += len; - } else { - for (int i = 0; i < len; i++) { - int r = read(); - if (r != -1) { - b[off + i] = (byte) r; - } else { - return i == 0 ? -1 : i; - } - } - } - - return len; - } - - /**************************************************************** - * Space left - until buffer overrun - **/ - public int available() throws IOException { - if (m_count < m_blobThreshold) { - return m_count - m_cursor; - } else { - return m_blobThreshold - m_cursor; - } - } - - /**************************************************************** - * utility method that extracts all data from this stream and - * writes to the output stream - **/ - public int read(OutputStream outstr) throws IOException { - byte b[] = new byte[512]; - - int retval = 0; - - int r = read(b); - while (r == 512) { - outstr.write(b, 0, r); - retval += r; - - r = read(b); - } - - if (r != -1) { - outstr.write(b, 0, r); - retval += r; - } - - return retval; - } - } \ No newline at end of file Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/WriteBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/WriteBlock.java 2010-11-11 13:41:06 UTC (rev 3929) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/WriteBlock.java 2010-11-11 13:45:29 UTC (rev 3930) @@ -1,223 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -package com.bigdata.rwstore; - -import java.io.*; - -public class WriteBlock { - protected static java.util.logging.Logger cat = java.util.logging.Logger.getLogger(WriteBlock.class.getName()); - - int m_total = 0; - RandomAccessFile m_file = null; - - WriteEntry m_head = null; - WriteEntry m_tail = null; // search back from end if better bet in general - - public WriteBlock(RandomAccessFile file) { - m_file = file; - } - - boolean m_directWrite = true; - - public void addWrite(long diskAddr, byte[] buf, int size) { - if (size == 0) { // nowt to do - return; - } - - if (diskAddr < 0) { - throw new Error("addWrite called to negative address! - " + diskAddr); - } - - if (m_directWrite && diskAddr > 100 * 1000 * 1000) { - // only start buffering writes once filesize > 100Mb ? - m_directWrite = false; - } - - if (m_directWrite) { - try { - m_file.seek(diskAddr); - m_file.write(buf, 0, size); - // cat.info("at: " + diskAddr + ", length: " + size); - } catch (Exception e) { - throw new Error("WriteBlock.doWrite : " + m_file + "-" + diskAddr + "-" + buf + "-" + e); - } - } else { - WriteEntry block = new WriteEntry(diskAddr, buf, size); - m_total += size; - - placeEntry(block); - - if (m_total > 50 * 1024) { // should be configured? - flush(); - } - } - } - - /********************************************************** - * find first block whose address is greater than this, and insert before - * - * if block has same address, then update with this buffer/size - **/ - void placeEntry(WriteEntry entry) { - if (m_head == null) { - m_head = entry; - m_tail = entry; - } else { - WriteEntry tail = m_tail; - - while (tail != null) { - if (tail.m_diskAddr < entry.m_diskAddr) { - entry.m_next = tail.m_next; - entry.m_prev = tail; - tail.m_next = entry; - - break; - } - - tail = tail.m_prev; - } - - if (tail == null) { - entry.m_next = m_head; - m_head = entry; - } else if (tail.m_diskAddr == entry.m_diskAddr) { - tail.m_buf = entry.m_buf; // use updated buffer - - return; - } - - if (entry.m_next != null) { - entry.m_next.m_prev = entry; - } else { - m_tail = entry; - } - } - } - - public void flush() { - WriteEntry entry = m_head; - long addr = 0; - while (entry != null) { - entry.doWrite(m_file); - - if (addr == entry.m_diskAddr) { - throw new RuntimeException("WriteBlock.flush : *** DUPLICATE WRITE *** " + addr); - } - - addr = entry.m_diskAddr; - - entry = entry.m_next; - } - - clear(); - } - - public void clear() { - m_head = null; - m_tail = null; - m_total = 0; - } - - public boolean removeWriteToAddr(long addr) { - WriteEntry entry = m_head; - - while (entry != null) { - if (entry.m_diskAddr == addr) { - if (entry.m_prev == null) { - m_head = entry.m_next; - } else { - entry.m_prev.m_next = entry.m_next; - } - - if (entry.m_next == null) { - m_tail = entry.m_prev; - } else { - entry.m_next.m_prev = entry.m_prev; - } - - return true; - } - - entry = entry.m_next; - } - - return false; - } - - static class WriteEntry { - - WriteEntry m_prev = null; - WriteEntry m_next = null; - - long m_diskAddr; - byte[] m_buf = null; - - int m_writeCount = 0; - - WriteEntry(long diskAddr, byte[] buf, int size) { - m_diskAddr = diskAddr; - - if (size > 0) { - m_buf = new byte[size]; - System.arraycopy(buf, 0, m_buf, 0, size); - } - } - - void doWrite(RandomAccessFile file) { - if (m_buf == null) { - cat.warning("WriteEntry:doWrite - with null buffer"); - - return; - } - - if (m_writeCount++ > 0) { - throw new Error("Write Block written more than once: " + m_writeCount); - } - - try { - file.seek(m_diskAddr); - file.write(m_buf); - // cat.warning("at: " + m_diskAddr + ", length: " + m_buf.length); - } catch (Exception e) { - throw new RuntimeException("WriteBlock.doWrite : " + file + "-" + m_diskAddr + "-" + m_buf + "-" + e); - } - } - - public boolean equals(Object obj) { - return m_diskAddr == ((WriteEntry) obj).m_diskAddr; - } - - public int compareTo(Object obj) { - long diskAddr = ((WriteEntry) obj).m_diskAddr; - if (m_diskAddr < diskAddr) { - return -1; - } else if (m_diskAddr > diskAddr) { - return 1; - } else { - return 0; - } - } - } -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-11 14:07:29
|
Revision: 3934 http://bigdata.svn.sourceforge.net/bigdata/?rev=3934&view=rev Author: thompsonbry Date: 2010-11-11 14:07:22 +0000 (Thu, 11 Nov 2010) Log Message: ----------- Made a few things in AllocBlock private or final. Removed the WriteCacheServiceReference from AllocBlock and FixedAllocator as it was not used. Updated RWStore to reflect the change to the FixedAllocator ctor. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-11 13:46:38 UTC (rev 3933) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-11 14:07:22 UTC (rev 3934) @@ -26,7 +26,6 @@ import java.util.ArrayList; -import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.rwstore.RWStore.AllocationStats; /** @@ -37,6 +36,10 @@ * to use {@link System#arraycopy(Object, int, Object, int, int)} to copy * the data rather than cloning it. * + * @todo Review the locks held during reads against {@link AllocBlock}. Is it + * possible that we could have updates which are not being made visible to + * readers? + * * @todo change to use long[]s. */ public class AllocBlock { @@ -67,20 +70,20 @@ * Just the newly allocated bits. This will be copied onto {@link #m_commit} * when the current native transaction commits. */ - int m_bits[]; + final int m_bits[]; /** * All of the bits from the commit point on entry to the current native * transaction plus any newly allocated bits. */ int m_transients[]; - /** - * Used to clear an address on the {@link WriteCacheService} if it has been - * freed. - */ - private final RWWriteCacheService m_writeCache; +// /** +// * Used to clear an address on the {@link WriteCacheService} if it has been +// * freed. +// */ +// private final RWWriteCacheService m_writeCache; - AllocBlock(final int addrIsUnused, final int bitSize, final RWWriteCacheService cache) { - m_writeCache = cache; + AllocBlock(final int addrIsUnused, final int bitSize) {//, final RWWriteCacheService cache) { +// m_writeCache = cache; m_ints = bitSize; m_commit = new int[bitSize]; m_bits = new int[bitSize]; @@ -116,16 +119,16 @@ if (!RWStore.tstBit(m_bits, bit)) { throw new IllegalArgumentException("Freeing bit not set"); } - - // Allocation optimization - if bit NOT set in committed memory then - // clear - // the transient bit to permit reallocation within this transaction. - // - // Note that with buffered IO there is also an opportunity to avoid - // output to - // the file by removing any pending write to the now freed address. On - // large - // transaction scopes this may be significant. + + /* + * Allocation optimization - if bit NOT set in committed memory then + * clear the transient bit to permit reallocation within this + * transaction. + * + * Note that with buffered IO there is also an opportunity to avoid + * output to the file by removing any pending write to the now freed + * address. On large transaction scopes this may be significant. + */ RWStore.clrBit(m_bits, bit); if (!RWStore.tstBit(m_commit, bit)) { @@ -190,7 +193,7 @@ return allocBits; } - public String getStats(AllocationStats stats) { + public String getStats(final AllocationStats stats) { final int total = m_ints * 32; final int allocBits = getAllocBits(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-11 13:46:38 UTC (rev 3933) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-11 14:07:22 UTC (rev 3934) @@ -42,7 +42,7 @@ private static final Logger log = Logger.getLogger(FixedAllocator.class); - final private RWWriteCacheService m_writeCache; +// final private RWWriteCacheService m_writeCache; volatile private int m_freeBits; volatile private int m_freeTransients; @@ -298,7 +298,7 @@ * @param preserveSessionData * @param cache */ - FixedAllocator(final RWStore store, final int size, final RWWriteCacheService cache) { + FixedAllocator(final RWStore store, final int size) {//, final RWWriteCacheService cache) { m_diskAddr = 0; m_store = store; @@ -323,7 +323,7 @@ m_bitSize = 32; } - m_writeCache = cache; +// m_writeCache = cache; // number of blocks in this allocator, bitSize plus 1 for start address final int numBlocks = 255 / (m_bitSize + 1); @@ -335,7 +335,7 @@ */ m_allocBlocks = new ArrayList<AllocBlock>(numBlocks); for (int i = 0; i < numBlocks; i++) { - m_allocBlocks.add(new AllocBlock(0, m_bitSize, m_writeCache)); + m_allocBlocks.add(new AllocBlock(0, m_bitSize));//, cache)); } m_freeTransients = 0; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-11 13:46:38 UTC (rev 3933) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-11 14:07:22 UTC (rev 3934) @@ -915,7 +915,7 @@ while (fixedSize < allocSize) fixedSize = 64 * m_allocSizes[++index]; - allocator = new FixedAllocator(this, allocSize, m_writeCache); + allocator = new FixedAllocator(this, allocSize);//, m_writeCache); freeList = m_freeFixed[index]; } else { @@ -969,7 +969,7 @@ final int allocSize = 64 * m_allocSizes[block]; final FixedAllocator allocator = new FixedAllocator(this, - allocSize, m_writeCache); + allocSize);//, m_writeCache); allocator.setIndex(m_allocs.size()); @@ -1480,7 +1480,7 @@ final ArrayList<FixedAllocator> list = m_freeFixed[i]; if (list.size() == 0) { - allocator = new FixedAllocator(this, block, m_writeCache); + allocator = new FixedAllocator(this, block);//, m_writeCache); allocator.setFreeList(list); allocator.setIndex(m_allocs.size()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-11 14:39:50
|
Revision: 3935 http://bigdata.svn.sourceforge.net/bigdata/?rev=3935&view=rev Author: thompsonbry Date: 2010-11-11 14:39:43 +0000 (Thu, 11 Nov 2010) Log Message: ----------- Removed three files which were not in use (Config, ICommitCallback, DirectOutputStream). Removed several methods from IStore which were not in use. Updated RWStore to remove the concept of a persistent session and updated FixedAllocator to always act without reference to the concept of a persistent session. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Removed Paths: ------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Config.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/ICommitCallback.java Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Config.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Config.java 2010-11-11 14:07:22 UTC (rev 3934) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Config.java 2010-11-11 14:39:43 UTC (rev 3935) @@ -1,41 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rwstore; - -/************************************************************************************************ - * This is a bit of a cludge, but somehow we need to let the stores know whether they need - * a locak file in this context - typically one is not needed in a web-application context. - **/ -public final class Config { - static boolean m_lockFileNeeded = true; - - public static boolean isLockFileNeeded() { - return m_lockFileNeeded; - } - - public static void setLockFileNeeded(boolean isNeeded) { - m_lockFileNeeded = isNeeded; - } -} Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectOutputStream.java 2010-11-11 14:07:22 UTC (rev 3934) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectOutputStream.java 2010-11-11 14:39:43 UTC (rev 3935) @@ -1,57 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rwstore; - -import java.io.*; - -public class DirectOutputStream extends ByteArrayOutputStream { - - public DirectOutputStream(int size) { - super(size); - } - - public DirectOutputStream() { - } - - public void directWrite(RandomAccessFile file, int size) throws IOException { - file.write(buf, 0, size); - } - - public void directWrite(RandomAccessFile outfile) throws java.io.IOException { - outfile.write(buf, 0, size()); - } - - //------------------------------------------------------------- - - public void directWrite(java.io.OutputStream outstr) throws java.io.IOException { - outstr.write(buf, 0, size()); - } - - //------------------------------------------------------------- - - public void directWrite(java.io.OutputStream outstr, int outSize) throws java.io.IOException { - outstr.write(buf, 0, outSize); - } -} \ No newline at end of file Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-11 14:07:22 UTC (rev 3934) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-11 14:39:43 UTC (rev 3935) @@ -175,9 +175,9 @@ str.writeInt(block.m_bits[i]); } - if (!m_store.isSessionPreserved()) { +// if (!m_store.isSessionPreserved()) { block.m_transients = block.m_bits.clone(); - } +// } /** * If this allocator is shadowed then copy the new committed @@ -187,8 +187,8 @@ assert block.m_saveCommit != null; block.m_saveCommit = block.m_bits.clone(); - } else if (m_store.isSessionPreserved()) { - block.m_commit = block.m_transients.clone(); +// } else if (m_store.isSessionPreserved()) { +// block.m_commit = block.m_transients.clone(); } else { block.m_commit = block.m_bits.clone(); } @@ -201,19 +201,19 @@ str.close(); } - if (!m_store.isSessionPreserved()) { - m_freeBits += m_freeTransients; +// if (!m_store.isSessionPreserved()) { + m_freeBits += m_freeTransients; - // Handle re-addition to free list once transient frees are - // added back - if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { - m_freeList.add(this); - m_freeWaiting = false; - } - - m_freeTransients = 0; + // Handle re-addition to free list once transient frees are + // added back + if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { + m_freeList.add(this); + m_freeWaiting = false; } + m_freeTransients = 0; +// } + return buf; } catch (IOException e) { throw new StorageTerminalError("Error on write", e); Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/ICommitCallback.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/ICommitCallback.java 2010-11-11 14:07:22 UTC (rev 3934) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/ICommitCallback.java 2010-11-11 14:39:43 UTC (rev 3935) @@ -1,29 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rwstore; - -public interface ICommitCallback { - public void CommitCallback(); -} Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-11-11 14:07:22 UTC (rev 3934) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-11-11 14:39:43 UTC (rev 3935) @@ -26,258 +26,109 @@ import java.io.File; - -/************************************************************************************************ - * The IStore interface provides persistent file-backed storage. - * It can be used as a standalone utility, but has been primarily designed - * to support the Generic Persistent Object model. - **/ +/** + * The IStore interface provides persistent file-backed storage. It can be used + * as a standalone utility, but has been primarily designed to support the + * Generic Persistent Object model. + */ public interface IStore { - -// /********************************************************************* -// * Provides a link to an object to carryout any additional data updates -// * before the physical commit - used by the GPO object managers for example -// **/ -// public static interface ICommitCallback { -// public void commitCallback(); -// public void commitComplete(); -// } - public boolean isLongAddress(); - // /************************************************************** -// * Registers a commitCallback object. -// * -// * <p>This method may be called more than once, there maybe several -// * such objects.</p> -// * -// * <p>It is used by the GPO object managers to allow them to store -// * index information and other updated data after a commit -// * cycle.</p> +// * called when used as a server, returns whether facility is enabled // **/ -// public void setCommitCallback(ICommitCallback callback); - - /************************************************************** - * called when used as a server, returns whether facility is enabled - **/ - public boolean preserveSessionData(); +// public boolean preserveSessionData(); -// /************************************************************** -// * the filestore may be explicitly limited -// * - useful when testing, it is all too easy to fill a disk -// * -// * <p>the default is 1GB</p> -// * -// * @param size the new max filesize >> 8 -// **/ -// public void setMaxFileSize(int size); - - /************************************************************** - * the lowest level interface should normally not be used directly. + /** + * Writes data on the store. * * @return the allocated address **/ public long alloc(byte buf[], int size, IAllocationContext context); - - /************************************************************** - * frees allocated storage - * - * @param addr the storage address to be freed - **/ + + /** + * Frees allocated storage + * + * @param addr + * the storage address to be freed + */ public void free(long addr, int size); // /************************************************************** -// * Reallocates storage +// * Odd method needed by PSInputStream to fetch data of unknown +// * size into a buffer // * -// * @param oldAddr is the existing address to be freed -// * @return a stream to write to the store +// * <p>Both RWStore and WOStore store data in either explicit or +// * implicit block sizes.</p> +// * +// * @param addr the address of the data in the IStore +// * buf the buffer to store the data in +// * +// * @returns the size of the data copied // **/ -// public PSOutputStream realloc(long oldAddr, int size); -// -// public PSInputStream getData(long value); +// public int getDataSize(long addr, byte buf[]); - /************************************************************** - * Odd method needed by PSInputStream to fetch data of unknown - * size into a buffer - * - * <p>Both RWStore and WOStore store data in either explicit or - * implicit block sizes.</p> - * - * @param addr the address of the data in the IStore - * buf the buffer to store the data in - * - * &returns the size of the data copied - **/ - public int getDataSize(long addr, byte buf[]); - + /** + * Read data of a known size from the store. + * + * @param l + * the address of the data + * @param buf + * the buffer of the size required! + */ + public void getData(long l, byte buf[]); + // /************************************************************** -// * if the caller can be sure of the size, then a more efficient allocation can be made, -// * but the corresponding getData call must also be made with an explicit size. +// * Given a physical address (byte offset on the store), return true +// * if that address could be managed by an allocated block. // * -// * <p>this should not generally be used - but specific objects can exploit this -// * interface for storing special purpose fixed size structures.</p> -// * -// * <p>Note that the Write Once Store will not automatically preserve historical -// * address information if explicit buffers are used.</p> +// * @param a the storage address to be tested // **/ -// public long realloc(long oldaddr, int oldsze, byte buf[]); - - /************************************************************** - * Used to retrieve data of a known size, typically after having - * been allocated using fixed size reallocation. - * - * @param l the address of the data - * @param buf the buffer of the size required! - **/ - public void getData(long l, byte buf[]); - - /************************************************************** - * a debug method that verifies a storage address as active - * - * @param a the storage address to be tested - **/ - public boolean verify(long a); +// public boolean verify(long a); - /*************************************************************************************** - * this supports the core functionality of a WormStore, other stores should return - * zero, indicating no previous versions available - **/ - public long getPreviousAddress(long addr); - - /*************************************************************************************** - * @return whether the address given is a native IStore address - **/ - public boolean isNativeAddress(long value); - -// /*************************************************************************************** -// * the root address enables the store to be self contained! -// * Along with the allocation information to manage the data, the store by default -// * can store and provide a root address to data needed to initialize the system. -// * -// * @param addr the address to be stored as "root" -// **/ -// public void setRootAddr(long addr); -// -// /*************************************************************************************** -// * @return the root address previously set -// **/ -// public long getRootAddr(); +// /** +// * The {@link RWStore} always generates negative address values. +// * +// * @return whether the address given is a native IStore address +// */ +// public boolean isNativeAddress(long value); -// /*************************************************************************************** -// * A utility equivalent to : store.getData(store.getRootAddr()); +// /** +// * useful in debug situations // * -// * @return an InputStream for any data stored at the root address -// **/ -// public PSInputStream getRoot(); +// * @return store allocation and usage statistics +// */ +// public String getStats(boolean full); -// /*************************************************************************************** -// * clears all data from the store. -// **/ -// public void clear(); - - -// /*************************************************************************************** -// * increments the current nested transaction level -// **/ -// public void startTransaction(); -// -// /*************************************************************************************** -// * decrements the current nested transaction level, if the value is reduced to zero then -// * a physical commit is carried out, if the level is already zero, a runtime exception -// * is thrown. -// **/ -// public void commitTransaction(); -// -// /*************************************************************************************** -// * if the transaction level is greater than one, all modifcations are undone, and the -// * transaction level set to zero. -// **/ -// public void rollbackTransaction(); - - /*************************************************************************************** - * does what it says - **/ - public String getVersionString(); - - /*************************************************************************************** - * useful in debug situations - * - * @return store allocation and usage statistics - **/ - public String getStats(boolean full); - + /** + * Close the file. + */ public void close(); - /*************************************************************************************** - * Needed by PSOutputStream for BLOB buffer chaining. - **/ - public int bufferChainOffset(); - -// public void absoluteWriteLong(long addr, int threshold, long value); -// -// /*************************************************************************************** +// /** // * Needed by PSOutputStream for BLOB buffer chaining. -// **/ -// public void absoluteWriteInt(int addr, int offset, int value); -// -// /*************************************************************************************** -// * Needed to free Blob chains. -// **/ -// public int absoluteReadInt(int addr, int offset); -// -// /*************************************************************************************** -// * Needed to free Blob chains. -// **/ -// public int absoluteReadLong(long addr, int offset); - -// /*************************************************************************************** -// * copies the store to a new file, this is not necessarily a byte for byte copy -// * since the store could write only consolidated data - particulalry relevant for the -// * Write Once store. -// * -// * @param filename specifies the file to be copied to. -// **/ -// public void backup(String filename) throws FileNotFoundException, IOException; -// -// /*************************************************************************************** -// * copies the store to a new file, this is not necessarily a byte for byte copy -// * since the store could write only consolidated data - particulalry relevant for the -// * Write Once store. -// * -// * @param outstr specifies stream to be copied to. -// **/ -// public void backup(OutputStream outstr) throws IOException; -// -// /*************************************************************************************** -// * useful in deployed web services to be able to restore a previously backed-up -// * store. Can also be useful to copy databases, for example, when running -// * a test system that can be simply restored to a backup extracted from a live system. -// * -// * @param instr specifies stream to be restored from. -// **/ -// public void restore(InputStream instr) throws IOException; +// */ +// public int bufferChainOffset(); - /********************************************************************************************* - * Retrieves store file. - * Can be used to delete the store after the IStore has been released - * @return the File object - **/ + /** + * Retrieves store file. Can be used to delete the store after the IStore + * has been released + * + * @return the File object + */ public File getStoreFile(); -// public void absoluteWriteAddress(long addr, int threshold, long addr2); - - public int getAddressSize(); - /** - * Called by the PSOutputStream to register the header bloc of a blob. The store - * must return a new address that is used to retrieve the blob header. This double - * indirection is required to be able to manage the blobs, since the blob header - * itself is of variable size and is handled by the standard FixedAllocators in the - * RWStore. For a WORM implementation the address of the blob header can be returned - * directly + * Called by the PSOutputStream to register the header block of a blob. The + * store must return a new address that is used to retrieve the blob header. + * This double indirection is required to be able to manage the blobs, since + * the blob header itself is of variable size and is handled by the standard + * FixedAllocators in the RWStore. * * @param addr - * @return + * The address of the header block of the blob. + * + * @return The */ public int registerBlob(int addr); + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-11 14:07:22 UTC (rev 3934) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-11 14:39:43 UTC (rev 3935) @@ -36,10 +36,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -300,24 +298,20 @@ // protected int m_transactionCount; // private boolean m_committing; - /** - * When <code>true</code> the allocations will not actually be recycled - * until after a store restart. When <code>false</code>, the allocations are - * recycled once they satisfy the history retention requirement. - * - * FIXME Should this go away or be raised as an option for unlimited - * retention until restart? - */ - private boolean m_preserveSession = false; +// /** +// * When <code>true</code> the allocations will not actually be recycled +// * until after a store restart. When <code>false</code>, the allocations are +// * recycled once they satisfy the history retention requirement. +// */ +// private boolean m_preserveSession = false; // private boolean m_readOnly; - /** - * lists of total alloc blocks. - * - * @todo examine concurrency and lock usage for {@link #m_alloc}, which is - * used by {@link #getStats(boolean)}, and the rest of these lists as - * well. - */ + /** + * lists of total alloc blocks. + * + * @todo examine concurrency and lock usage for {@link #m_alloc} and the + * rest of these lists. + */ private final ArrayList<Allocator> m_allocs; /** lists of free alloc blocks. */ @@ -1295,46 +1289,46 @@ return out.toString(); } - /** - * FIXME: This method is not currently used with BigData, if needed then - * the address mangling needs re-working - */ - public int getDataSize(long addr, byte buf[]) { - throw new UnsupportedOperationException(); - -// synchronized (this) { -// m_writes.flush(); -// -// if (addr == 0) { -// return 0; -// } -// -// try { -// int size = addr2Size((int) addr); -// synchronized (m_raf) { -//// m_raf.seek(physicalAddress((int) addr)); -//// m_raf.readFully(buf, 0, size); -// m_raf.getChannel().read(ByteBuffer.wrap(buf, 0, size), physicalAddress((int) addr)); -// } -// -// return size; -// } catch (IOException e) { -// throw new StorageTerminalError("Unable to read data", e); -// } -// } - } +// /** +// * FIXME: This method is not currently used with BigData, if needed then +// * the address mangling needs re-working +// */ +// public int getDataSize(long addr, byte buf[]) { +// throw new UnsupportedOperationException(); +// +//// synchronized (this) { +//// m_writes.flush(); +//// +//// if (addr == 0) { +//// return 0; +//// } +//// +//// try { +//// int size = addr2Size((int) addr); +//// synchronized (m_raf) { +////// m_raf.seek(physicalAddress((int) addr)); +////// m_raf.readFully(buf, 0, size); +//// m_raf.getChannel().read(ByteBuffer.wrap(buf, 0, size), physicalAddress((int) addr)); +//// } +//// +//// return size; +//// } catch (IOException e) { +//// throw new StorageTerminalError("Unable to read data", e); +//// } +//// } +// } - /** - * Always returns ZERO (0L). - * <p> - * This is intended to support the core functionality of a WormStore, other - * stores should return zero, indicating no previous versions available - */ - public long getPreviousAddress(final long laddr) { - - return 0; - - } +// /** +// * Always returns ZERO (0L). +// * <p> +// * This is intended to support the core functionality of a WormStore, other +// * stores should return zero, indicating no previous versions available +// */ +// public long getPreviousAddress(final long laddr) { +// +// return 0; +// +// } public void free(final long laddr, final int sze) { @@ -1795,12 +1789,12 @@ } } - static final float s_version = 3.0f; +// static final float s_version = 3.0f; +// +// public String getVersionString() { +// return "RWStore " + s_version; +// } - public String getVersionString() { - return "RWStore " + s_version; - } - public void commitChanges(final Journal journal) { assertOpen(); checkCoreAllocations(); @@ -2361,35 +2355,35 @@ return -1; } - // -------------------------------------------------------------------------------------- - private String allocListStats(final List<Allocator> list, final AtomicLong counter) { - final StringBuffer stats = new StringBuffer(); - final Iterator<Allocator> iter = list.iterator(); - while (iter.hasNext()) { - stats.append(iter.next().getStats(counter)); - } - - return stats.toString(); - } - - public String getStats(final boolean full) { - - final AtomicLong counter = new AtomicLong(); - - final StringBuilder sb = new StringBuilder("FileSize : " + m_fileSize - + " allocated : " + m_nextAllocation + "\r\n"); - - if (full) { - - sb.append(allocListStats(m_allocs, counter)); - - sb.append("Allocated : " + counter); - - } - - return sb.toString(); - - } +// // -------------------------------------------------------------------------------------- +// private String allocListStats(final List<Allocator> list, final AtomicLong counter) { +// final StringBuffer stats = new StringBuffer(); +// final Iterator<Allocator> iter = list.iterator(); +// while (iter.hasNext()) { +// stats.append(iter.next().getStats(counter)); +// } +// +// return stats.toString(); +// } +// +// public String getStats(final boolean full) { +// +// final AtomicLong counter = new AtomicLong(); +// +// final StringBuilder sb = new StringBuilder("FileSize : " + m_fileSize +// + " allocated : " + m_nextAllocation + "\r\n"); +// +// if (full) { +// +// sb.append(allocListStats(m_allocs, counter)); +// +// sb.append("Allocated : " + counter); +// +// } +// +// return sb.toString(); +// +// } public static class AllocationStats { public AllocationStats(final int i) { @@ -2510,6 +2504,13 @@ // -------------------------------------------------------------------------------------- + /** + * Given a physical address (byte offset on the store), return true if that + * address could be managed by an allocated block. + * + * @param a + * the storage address to be tested. + */ public boolean verify(final long laddr) { final int addr = (int) laddr; @@ -2594,28 +2595,33 @@ // } // } + /** + * The {@link RWStore} always generates negative address values. + * + * @return whether the address given is a native IStore address + */ public boolean isNativeAddress(final long addr) { return addr <= 0; } - /******************************************************************************* - * called when used as a server, returns whether facility is enabled, this - * is the whole point of the wormStore - so the answer is true - **/ - public boolean preserveSessionData() { - m_preserveSession = true; +// /******************************************************************************* +// * called when used as a server, returns whether facility is enabled, this +// * is the whole point of the wormStore - so the answer is true +// **/ +// public boolean preserveSessionData() { +// m_preserveSession = true; +// +// return true; +// } +// +// /******************************************************************************* +// * called by allocation blocks to determine whether they can re-allocate +// * data within this session. +// **/ +// protected boolean isSessionPreserved() { +// return m_preserveSession || m_contexts.size() > 0; +// } - return true; - } - - /******************************************************************************* - * called by allocation blocks to determine whether they can re-allocate - * data within this session. - **/ - protected boolean isSessionPreserved() { - return m_preserveSession || m_contexts.size() > 0; - } - // /********************************************************************* // * create backup file, copy data to it, and close it. // **/ @@ -2702,21 +2708,21 @@ // } // } - /*************************************************************************************** - * Needed by PSOutputStream for BLOB buffer chaining. - **/ - public int bufferChainOffset() { - return m_maxFixedAlloc - 4; - } +// /*************************************************************************************** +// * Needed by PSOutputStream for BLOB buffer chaining. +// **/ +// public int bufferChainOffset() { +// return m_maxFixedAlloc - 4; +// } public File getStoreFile() { return m_fd; } - public boolean isLongAddress() { - // always ints - return false; - } +// public boolean isLongAddress() { +// // always ints +// return false; +// } // public int absoluteReadLong(long addr, int offset) { // throw new UnsupportedOperationException(); @@ -2730,9 +2736,9 @@ // absoluteWriteInt((int) addr, threshold, (int) addr2); // } - public int getAddressSize() { - return 4; - } +// public int getAddressSize() { +// return 4; +// } // public RandomAccessFile getRandomAccessFile() { // return m_raf; @@ -2838,7 +2844,7 @@ /** * A Blob Allocator maintains a list of Blob headers. The allocator stores - * upto 255 blob headers plus a checksum. When a request is made to read the + * up to 255 blob headers plus a checksum. When a request is made to read the * blob data, the blob allocator retrieves the blob header and reads the * data from that into the passed byte array. */ @@ -2851,8 +2857,8 @@ } if (ba == null) { final Allocator lalloc = (Allocator) m_allocs.get(m_allocs.size() - 1); - final int psa = lalloc.getRawStartAddr(); // previous block - // start address + // previous block start address + final int psa = lalloc.getRawStartAddr(); assert (psa - 1) > m_nextAllocation; ba = new BlobAllocator(this, psa - 1); ba.setFreeList(m_freeBlobs); // will add itself to the free list This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-12 17:17:40
|
Revision: 3942 http://bigdata.svn.sourceforge.net/bigdata/?rev=3942&view=rev Author: martyncutcher Date: 2010-11-12 17:17:33 +0000 (Fri, 12 Nov 2010) Log Message: ----------- Remove BlobAllocator and handle deferFree Blobs with addressing sign convention Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Removed Paths: ------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-12 16:33:28 UTC (rev 3941) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-12 17:17:33 UTC (rev 3942) @@ -1,343 +0,0 @@ -package com.bigdata.rwstore; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; - -import com.bigdata.rwstore.RWStore.AllocationStats; -import com.bigdata.util.ChecksumUtility; - -/** - * BlobAllocator. - * - * Manages Blob allocations using a list of {@link FixedAllocator}s. - * - * The main advantage of this is for re-allocation, since the - * {@link FixedAllocator}s can be efficiently re-cycled where a fixed Blob - * creates issues of best fit and fragmentation. - * - * Some simple patterns would cause un-reallocatable storage, consider a Blob - * that always re-allocated to a larger size, or a pattern where several blobs - * got larger together, in these scenarios, smaller allocations would never be - * re-used, whilst the mechanism of component based allocation is easily - * re-used. - * - * @author mgc - */ -public class BlobAllocator implements Allocator { - - private static final transient Logger log = Logger.getLogger(BlobAllocator.class); - - final private int[] m_hdrs = new int[254]; - final private RWStore m_store; - private int m_diskAddr; - private int m_index; - private int m_sortAddr; - private ArrayList m_freeList; - private long m_startAddr; - /** - * There are 256 ints in a BlobAllocator, the first is used to provide the - * sortAddr, and the last for the checksum, leaving 254 BlobHdr addresses - */ - private int m_freeSpots = 254; - - public BlobAllocator(final RWStore store, final int sortAddr) { - m_store = store; - m_sortAddr = sortAddr; - - if (log.isInfoEnabled()) - log.info("New BlobAllocator"); - } - - public void addAddresses(final ArrayList addrs) { - // not relevant for BlobAllocators - } - - public boolean addressInRange(final int addr) { - // not relevant for BlobAllocators - return false; - } - - /** - * Should not be called directly since the PSOutputStream - * manages the blob allocations. - */ - public int alloc(final RWStore store, final int size, final IAllocationContext context) { - throw new UnsupportedOperationException("Blob allocators do not allocate addresses directly"); - } - - public boolean free(final int addr, final int sze) { - if (sze < (m_store.m_maxFixedAlloc-4)) - throw new IllegalArgumentException("Unexpected address size"); - final int alloc = m_store.m_maxFixedAlloc-4; - final int blcks = (alloc - 1 + sze)/alloc; - - int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; - if (hdr_idx > m_hdrs.length) - throw new IllegalArgumentException("free BlobAllocation problem, hdr offset: " + hdr_idx + ", avail:" + m_hdrs.length); - - final int hdr_addr = m_hdrs[hdr_idx]; - - if (hdr_addr == 0) { - return false; - } - - // read in header block, then free each reference - final byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum - m_store.getData(hdr_addr, hdr); - - final DataInputStream instr = new DataInputStream( - new ByteArrayInputStream(hdr, 0, hdr.length-4) ); - try { - final int allocs = instr.readInt(); - for (int i = 0; i < allocs; i++) { - final int nxt = instr.readInt(); - m_store.free(nxt, m_store.m_maxFixedAlloc); - } - m_store.free(hdr_addr, hdr.length); - m_hdrs[hdr_idx] = 0; - if (m_freeSpots++ == 0) { - m_freeList.add(this); - } - - return true; - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - public int getFirstFixedForBlob(final int addr, final int sze) { - if (sze < (m_store.m_maxFixedAlloc-4)) - throw new IllegalArgumentException("Unexpected address size: " + sze); - - final int alloc = m_store.m_maxFixedAlloc-4; - final int blcks = (alloc - 1 + sze)/alloc; - - final int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; - if (hdr_idx > m_hdrs.length) - throw new IllegalArgumentException("free BlobAllocation problem, hdr offset: " + hdr_idx + ", avail:" + m_hdrs.length); - - final int hdr_addr = m_hdrs[hdr_idx]; - - if (hdr_addr == 0) { - throw new IllegalArgumentException("getFirstFixedForBlob called with unallocated address"); - } - - // read in header block, then free each reference - final byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum - m_store.getData(hdr_addr, hdr); - - final DataInputStream instr = new DataInputStream( - new ByteArrayInputStream(hdr, 0, hdr.length-4) ); - try { - final int nallocs = instr.readInt(); - final int faddr = instr.readInt(); - - return faddr; - - } catch (IOException ioe) { - throw new RuntimeException("Unable to retrieve first fixed address", ioe); - } - } - - public int getBlockSize() { - // Not relevant for Blobs - return 0; - } - - public int getDiskAddr() { - return m_diskAddr; - } - - /** - * returns physical address of blob header if any. - */ - public long getPhysicalAddress(final int offset) { - return m_store.physicalAddress(m_hdrs[offset]); - } - - /** - * Since the Blob Allocator simply manages access to FixedAllocation blocks it does not manage any - * allocations directly. - */ - public int getPhysicalSize(final int offset) { - return 0; - } - - /** - * The startAddr - */ - public long getStartAddr() { - // not relevant for blob - return RWStore.convertAddr(m_sortAddr); - } - - public String getStats(final AtomicLong counter) { - return ""; - } - - /** - * hasFree if there are any non-zero entries in the m_hdr array; - */ - public boolean hasFree() { - return m_freeSpots > 0; - } - - public void preserveSessionData() { - // all data held by fixed allocators - } - - /** - * FIXME: There is a symmetry problem with read/write where one takes a Stream and the other - * return a byte[]. This is problematical with using the checksums. - */ - public void read(final DataInputStream str) { - m_freeSpots = 0; - try { - for (int i = 0; i < 254; i++) { - m_hdrs[i] = str.readInt(); - if (m_hdrs[i] == 0) m_freeSpots++; - } - final int chk = str.readInt(); - // checksum int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); - - } catch (IOException e) { - log.error(e,e); - throw new IllegalStateException(e); - } - } - - public void setDiskAddr(final int addr) { - m_diskAddr = addr; - } - - public void setFreeList(final ArrayList list) { - m_freeList = list; - - if (hasFree()) { - m_freeList.add(this); - } - } - - /** - * setIndex is called in two places, firstly to set the original index and secondly on restore - * from storage to re-establish the order. - * - * When called initially, the m_startAddr will be zero and so must be set by retrieving the - * m_startAddr of the previous block (if any). Now, since a Blob must use fixed allocations we - * are guaranteed that a BlobAllocator will not be the first allocator. To derive a startAddr that - * can safely be used to sort a BlobAllocator against the previous (and subsequent) allocators we - * access the previous allocators address. - */ - public void setIndex(final int index) { - m_index = index; - } - - // @todo why is this a NOP? Javadoc. - public boolean verify(final int addr) { - // TODO Auto-generated method stub - return false; - } - - public byte[] write() { - try { - final byte[] buf = new byte[1024]; // @todo why this const? - final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); - - str.writeInt(m_sortAddr); - - for (int i = 0; i < 254; i++) { // @todo why this const? - str.writeInt(m_hdrs[i]); - } - - // add checksum - final int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); - str.writeInt(chk); - - return buf; - } catch (IOException ioe) { - throw new IllegalStateException(ioe); - } - } - - public int compareTo(final Object o) { - final Allocator alloc = (Allocator) o; - - assert getStartAddr() != alloc.getStartAddr(); - - return (getStartAddr() < alloc.getStartAddr()) ? -1 : 1; - } - - public int register(final int addr) { - assert m_freeSpots > 0; - - m_store.addToCommit(this); - - for (int i = 0; i < 254; i++) { - if (m_hdrs[i] == 0) { - m_hdrs[i] = addr; - - if (--m_freeSpots == 0) { - m_freeList.remove(this); - } - - final int ret = -((m_index << RWStore.OFFSET_BITS) + i); - if (((-ret) & RWStore.OFFSET_BITS_MASK) > m_hdrs.length) - throw new IllegalStateException("Invalid blob offset: " + ((-ret) & RWStore.OFFSET_BITS_MASK)); - - return ret; - } - } - - throw new IllegalStateException("BlobAllocator unable to find free slot"); - } - - public int getRawStartAddr() { - return m_sortAddr; - } - - public int getIndex() { - return m_index; - } - - public int getBlobHdrAddress(final int hdrIndex) { - return m_hdrs[hdrIndex]; - } - - public void appendShortStats(final StringBuilder str, final AllocationStats[] stats) { - if (stats == null) { - str.append("Index: " + m_index + ", address: " + getStartAddr() + ", BLOB\n"); - } else { - stats[stats.length-1].m_filledSlots += 254 - m_freeSpots; - stats[stats.length-1].m_reservedSlots += 254; - } - } - - public boolean isAllocated(final int offset) { - return m_hdrs[offset] != 0; - } - - /** - * This is okay as a NOP. The true allocation is managed by the - * FixedAllocators. - */ - public void detachContext(final IAllocationContext context) { - // NOP - } - - /** - * Since the real allocation is in the FixedAllocators, this should delegate - * to the first address, in which case - */ - public boolean canImmediatelyFree(final int addr, final int size, final IAllocationContext context) { - final int faddr = this.getFirstFixedForBlob(addr, size); - - return m_store.getBlockByAddress(faddr).canImmediatelyFree(faddr, 0, context); - } - -} Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-11-12 16:33:28 UTC (rev 3941) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-11-12 17:17:33 UTC (rev 3942) @@ -117,18 +117,18 @@ */ public File getStoreFile(); - /** - * Called by the PSOutputStream to register the header block of a blob. The - * store must return a new address that is used to retrieve the blob header. - * This double indirection is required to be able to manage the blobs, since - * the blob header itself is of variable size and is handled by the standard - * FixedAllocators in the RWStore. - * - * @param addr - * The address of the header block of the blob. - * - * @return The - */ - public int registerBlob(int addr); +// /** +// * Called by the PSOutputStream to register the header block of a blob. The +// * store must return a new address that is used to retrieve the blob header. +// * This double indirection is required to be able to manage the blobs, since +// * the blob header itself is of variable size and is handled by the standard +// * FixedAllocators in the RWStore. +// * +// * @param addr +// * The address of the header block of the blob. +// * +// * @return The +// */ +// public int registerBlob(int addr); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-12 16:33:28 UTC (rev 3941) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-12 17:17:33 UTC (rev 3942) @@ -320,7 +320,7 @@ private ArrayList<FixedAllocator> m_freeFixed[]; /** lists of free blob allocators. */ - private final ArrayList<BlobAllocator> m_freeBlobs; + // private final ArrayList<BlobAllocator> m_freeBlobs; /** lists of blocks requiring commitment. */ private final ArrayList<Allocator> m_commitList; @@ -520,7 +520,7 @@ m_allocs = new ArrayList<Allocator>(); - m_freeBlobs = new ArrayList<BlobAllocator>(); + // m_freeBlobs = new ArrayList<BlobAllocator>(); try { final RandomAccessFile m_raf = fileMetadata.getRandomAccessFile(); @@ -915,20 +915,17 @@ final int allocSize = strBuf.readInt(); // if Blob < 0 final Allocator allocator; final ArrayList<? extends Allocator> freeList; - if (allocSize > 0) { - int index = 0; - int fixedSize = m_minFixedAlloc; - while (fixedSize < allocSize) - fixedSize = 64 * m_allocSizes[++index]; + assert allocSize > 0; - allocator = new FixedAllocator(this, allocSize);//, m_writeCache); + int index = 0; + int fixedSize = m_minFixedAlloc; + while (fixedSize < allocSize) + fixedSize = 64 * m_allocSizes[++index]; - freeList = m_freeFixed[index]; - } else { - allocator = new BlobAllocator(this, allocSize); - freeList = m_freeBlobs; - } + allocator = new FixedAllocator(this, allocSize);//, m_writeCache); + freeList = m_freeFixed[index]; + allocator.read(strBuf); allocator.setDiskAddr(i); // store bit, not physical // address! @@ -1757,7 +1754,7 @@ m_commitList.clear(); m_allocs.clear(); - m_freeBlobs.clear(); + // m_freeBlobs.clear(); final int numFixed = m_allocSizes.length; for (int i = 0; i < numFixed; i++) { @@ -2904,34 +2901,34 @@ * blob data, the blob allocator retrieves the blob header and reads the * data from that into the passed byte array. */ - public int registerBlob(final int addr) { - m_allocationLock.lock(); - try { - BlobAllocator ba = null; - if (m_freeBlobs.size() > 0) { - ba = (BlobAllocator) m_freeBlobs.get(0); - } - if (ba == null) { - final Allocator lalloc = (Allocator) m_allocs.get(m_allocs.size() - 1); - // previous block start address - final int psa = lalloc.getRawStartAddr(); - assert (psa - 1) > m_nextAllocation; - ba = new BlobAllocator(this, psa - 1); - ba.setFreeList(m_freeBlobs); // will add itself to the free list - ba.setIndex(m_allocs.size()); - m_allocs.add(ba); - } +// public int registerBlob(final int addr) { +// m_allocationLock.lock(); +// try { +// BlobAllocator ba = null; +// if (m_freeBlobs.size() > 0) { +// ba = (BlobAllocator) m_freeBlobs.get(0); +// } +// if (ba == null) { +// final Allocator lalloc = (Allocator) m_allocs.get(m_allocs.size() - 1); +// // previous block start address +// final int psa = lalloc.getRawStartAddr(); +// assert (psa - 1) > m_nextAllocation; +// ba = new BlobAllocator(this, psa - 1); +// ba.setFreeList(m_freeBlobs); // will add itself to the free list +// ba.setIndex(m_allocs.size()); +// m_allocs.add(ba); +// } +// +// if (!m_commitList.contains(ba)) { +// m_commitList.add(ba); +// } +// +// return ba.register(addr); +// } finally { +// m_allocationLock.unlock(); +// } +// } - if (!m_commitList.contains(ba)) { - m_commitList.add(ba); - } - - return ba.register(addr); - } finally { - m_allocationLock.unlock(); - } - } - public void addToCommit(final Allocator allocator) { if (!m_commitList.contains(allocator)) { m_commitList.add(allocator); @@ -3108,11 +3105,11 @@ public void deferFree(final int rwaddr, final int sze) { m_allocationLock.lock(); try { - m_deferredFreeOut.writeInt(rwaddr); - - final Allocator alloc = getBlockByAddress(rwaddr); - if (alloc instanceof BlobAllocator) { + if (sze > this.m_maxFixedAlloc) { + m_deferredFreeOut.writeInt(-rwaddr); m_deferredFreeOut.writeInt(sze); + } else { + m_deferredFreeOut.writeInt(rwaddr); } } catch (IOException e) { throw new RuntimeException("Could not free: rwaddr=" + rwaddr @@ -3207,12 +3204,11 @@ while (nxtAddr != 0) { // while (false && addrs-- > 0) { - final Allocator alloc = getBlock(nxtAddr); - if (alloc instanceof BlobAllocator) { + if (nxtAddr > 0) { // Blob final int bloblen = strBuf.readInt(); assert bloblen > 0; // a Blob address MUST have a size - immediateFree(nxtAddr, bloblen); + immediateFree(-nxtAddr, bloblen); } else { immediateFree(nxtAddr, 0); // size ignored for FreeAllocators } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 13:33:39
|
Revision: 3951 http://bigdata.svn.sourceforge.net/bigdata/?rev=3951&view=rev Author: thompsonbry Date: 2010-11-17 13:33:32 +0000 (Wed, 17 Nov 2010) Log Message: ----------- PSOutputStream - modified to rethrow an exception which was being dumped on the console and ignored. made several method variables final. RWStore - mainly javadoc, including adding a checklist for a release at the top of the file. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-17 02:14:08 UTC (rev 3950) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-17 13:33:32 UTC (rev 3951) @@ -330,23 +330,23 @@ m_writingHdr = true; // ensure that header CAN be a BLOB // m_blobHeader[m_blobHdrIdx++] = addr; m_blobHeader.add(addr); - final int precount = m_count; +// final int precount = m_count; m_count = 0; try { // writeInt(m_blobHdrIdx); // for (int i = 0; i < m_blobHdrIdx; i++) { // writeInt(m_blobHeader[i]); // } - int hdrBufSize = 4*(m_blobHeader.size() + 1); - ByteArrayOutputStream hdrbuf = new ByteArrayOutputStream(hdrBufSize); - DataOutputStream hdrout = new DataOutputStream(hdrbuf); + final int hdrBufSize = 4*(m_blobHeader.size() + 1); + final ByteArrayOutputStream hdrbuf = new ByteArrayOutputStream(hdrBufSize); + final DataOutputStream hdrout = new DataOutputStream(hdrbuf); hdrout.writeInt(m_blobHeader.size()); for (int i = 0; i < m_blobHeader.size(); i++) { hdrout.writeInt(m_blobHeader.get(i)); } hdrout.flush(); - byte[] outbuf = hdrbuf.toByteArray(); + final byte[] outbuf = hdrbuf.toByteArray(); addr = (int) m_store.alloc(outbuf, hdrBufSize, m_context); // if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count) / m_blobThreshold)) { @@ -363,7 +363,8 @@ // DO NOT USE BLOB ALLOCATOR // addr = m_store.registerBlob(addr); // returns handle } catch (IOException e) { - e.printStackTrace(); +// e.printStackTrace(); + throw new RuntimeException(e); } } finally { m_writingHdr = false; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 02:14:08 UTC (rev 3950) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 13:33:32 UTC (rev 3951) @@ -105,25 +105,24 @@ * <p> * The method of storing the allocation headers has been changed from always * allocating at the end of the file (and moving them on file extend) to - * allocation of fixed areas. The meta-allocation data, containing the bitmap + * allocation of fixed areas. The meta-allocation data, containing the bitmap * that controls these allocations, is itself stored in the heap, and is now * structured to include both the bit data and the list of meta-storage * addresses. * <p> - * Sizing: - * 256 allocators would reference approximately 2M objects/allocations. At 1K - * per allocator this would require 250K of store. The meta-allocation data - * would therefore need a start address plus 32 bytes (or 8 ints) to represent - * the meta-allocation bits. An array of such data referencing sequentially - * allocated storage areas completes the meta-allocation requirements. + * Sizing: 256 allocators would reference approximately 2M objects/allocations. + * At 1K per allocator this would require 250K of store. The meta-allocation + * data would therefore need a start address plus 32 bytes (or 8 ints) to + * represent the meta-allocation bits. An array of such data referencing + * sequentially allocated storage areas completes the meta-allocation + * requirements. * <p> * A meta-allocation address can therefore be represented as a single bit offset - * from which the block, providing start address, and bit offset can be - * directly determined. + * from which the block, providing start address, and bit offset can be directly + * determined. * <p> - * The m_metaBits int array used to be fully used as allocation bits, but - * now stores both the start address plus the 8 ints used to manage that data - * block. + * The m_metaBits int array used to be fully used as allocation bits, but now + * stores both the start address plus the 8 ints used to manage that data block. * <p> * Allocation is reduced to sets of allocator objects which have a start address * and a bitmap of allocated storage maps. @@ -136,9 +135,9 @@ * the BitSet class. * <p> * Using the meta-allocation bits, it is straightforward to load ALL the - * allocation headers. A total of (say) 100 allocation headers might provide - * up to 4000 allocations each -> 400 000 objects, while 1000 headers -> 4m - * objects and 2000 -> 8m objects. + * allocation headers. A total of (say) 100 allocation headers might provide up + * to 4000 allocations each -> 400 000 objects, while 1000 headers -> 4m objects + * and 2000 -> 8m objects. * <p> * The allocators are split into a set of FixedAllocators and then * BlobAllocation. The FixedAllocators will allocate from 128 to 32K objects, @@ -167,28 +166,44 @@ * All data is checksummed, both allocated/saved data and the allocation blocks. * <p> * BLOB allocation is not handled using chained data buffers but with a blob - * header record. This is indicated with a BlobAllocator that provides indexed + * header record. This is indicated with a BlobAllocator that provides indexed * offsets to the header record (the address encodes the BlobAllocator and the * offset to the address). The header record stores the number of component * allocations and the address of each. * <p> * This approach makes for much more efficient freeing/re-allocation of Blob - * storage, in particular avoiding the need to read in the component blocks - * to determine chained blocks for freeing. This is particularly important - * for larger stores where a disk cache could be flushed through simply freeing - * BLOB allocations. + * storage, in particular avoiding the need to read in the component blocks to + * determine chained blocks for freeing. This is particularly important for + * larger stores where a disk cache could be flushed through simply freeing BLOB + * allocations. * <h2> - * Deferred Free List - * </h2> + * Deferred Free List</h2> * <p> * The previous implementation has been amended to associate a single set of - * deferredFree blocks with each CommitRecord. The CommitRecordIndex will - * then provide access to the CommitRecords to support the deferred freeing - * of allocations based on age/earliestTxReleaseTime. + * deferredFree blocks with each CommitRecord. The CommitRecordIndex will then + * provide access to the CommitRecords to support the deferred freeing of + * allocations based on age/earliestTxReleaseTime. * <p> * The last release time processed is held with the MetaAllocation data * * @author Martyn Cutcher + * + * FIXME Release checklist: + * <p> + * Checksum metabits header record? + * <p> + * Checksum fixed allocators? + * <p> + * Checksum delete blocks / blob records? + * <p> + * PSOutputStream - remove caching logic. It is unused and makes this + * class much more complex. A separate per-RWStore caching class for + * recycling PSOutputStreams can be added later. + * <p> + * Modify FixedAllocator to use arrayCopy() rather than clone and + * declare more fields to be final. See notes on {@link AllocBlock}. + * <p> + * Implement logic to "abort" a shadow allocation context. */ public class RWStore implements IStore { @@ -281,8 +296,13 @@ static final int ALLOCATION_SCALEUP = 16; // multiplier to convert allocations based on minimum allocation of 32k static private final int META_ALLOCATION = 8; // 8 * 32K is size of meta Allocation - // Maximum fixed allocs in a BLOB, but do restrict to size that will fit within a single fixed allocation - // Ignored + /** + * Maximum fixed allocs in a BLOB, but do restrict to size that will fit + * within a single fixed allocation Ignored. + * + * FIXME Javadoc. Is this ignored or not? (what is the Ignored doing at the + * end of the comment above?) Is this in units of int32 values or bytes? + */ static final int BLOB_FIXED_ALLOCS = 2048; // private ICommitCallback m_commitCallback; // @@ -751,7 +771,7 @@ if (metaBitsStore > 0) { rawmbaddr >>= 16; - + // RWStore now restore metabits final byte[] buf = new byte[metaBitsStore * 4]; @@ -1063,8 +1083,8 @@ * direct read will be the blob header record. In this case we should hand * over the streaming to a PSInputStream. * - * FIXME: For now we do not use the PSInputStream but instead process - * directly + * FIXME: Javadoc update (was: For now we do not use the PSInputStream but instead process + * directly...) * * If it is a BlobAllocation, then the BlobAllocation address points to the * address of the BlobHeader record. @@ -1299,47 +1319,6 @@ return out.toString(); } -// /** -// * FIXME: This method is not currently used with BigData, if needed then -// * the address mangling needs re-working -// */ -// public int getDataSize(long addr, byte buf[]) { -// throw new UnsupportedOperationException(); -// -//// synchronized (this) { -//// m_writes.flush(); -//// -//// if (addr == 0) { -//// return 0; -//// } -//// -//// try { -//// int size = addr2Size((int) addr); -//// synchronized (m_raf) { -////// m_raf.seek(physicalAddress((int) addr)); -////// m_raf.readFully(buf, 0, size); -//// m_raf.getChannel().read(ByteBuffer.wrap(buf, 0, size), physicalAddress((int) addr)); -//// } -//// -//// return size; -//// } catch (IOException e) { -//// throw new StorageTerminalError("Unable to read data", e); -//// } -//// } -// } - -// /** -// * Always returns ZERO (0L). -// * <p> -// * This is intended to support the core functionality of a WormStore, other -// * stores should return zero, indicating no previous versions available -// */ -// public long getPreviousAddress(final long laddr) { -// -// return 0; -// -// } - public void free(final long laddr, final int sze) { free(laddr, sze, null/* AlocationContext */); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 17:32:22
|
Revision: 3952 http://bigdata.svn.sourceforge.net/bigdata/?rev=3952&view=rev Author: thompsonbry Date: 2010-11-17 17:32:16 +0000 (Wed, 17 Nov 2010) Log Message: ----------- Mostly javadoc changes, including documentation for the showAllocators table. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-17 13:33:32 UTC (rev 3951) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-17 17:32:16 UTC (rev 3952) @@ -123,6 +123,13 @@ return m_size; } + /** + * The free list for the allocation slot size serviced by this allocator. + * This is a reference back into the corresponding free list as managed by + * the RWStore. + * + * @see #setFreeList(ArrayList) + */ private ArrayList m_freeList; public void setFreeList(ArrayList list) { @@ -243,7 +250,7 @@ if (block.m_bits[i] == 0) { // empty m_freeBits += 32; } else if (block.m_bits[i] != 0xFFFFFFFF) { // not full - int anInt = block.m_bits[i]; + final int anInt = block.m_bits[i]; for (int bit = 0; bit < 32; bit++) { if ((anInt & (1 << bit)) == 0) { m_freeBits++; @@ -425,9 +432,9 @@ return true; } else if (addr >= m_startAddr && addr < m_endAddr) { - final Iterator iter = m_allocBlocks.iterator(); + final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); while (iter.hasNext()) { - final AllocBlock block = (AllocBlock) iter.next(); + final AllocBlock block = iter.next(); if (block.free(addr, m_size)) { m_freeTransients++; @@ -506,7 +513,8 @@ } public void addAddresses(ArrayList addrs) { - Iterator blocks = m_allocBlocks.iterator(); + + final Iterator blocks = m_allocBlocks.iterator(); // FIXME int baseAddr = -((m_index << 16) + 4); // bit adjust int baseAddr = -(m_index << 16); // bit adjust?? Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 13:33:32 UTC (rev 3951) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 17:32:16 UTC (rev 3952) @@ -190,12 +190,14 @@ * * FIXME Release checklist: * <p> - * Checksum metabits header record? + * Add metabits header record checksum field and verify on read back. * <p> - * Checksum fixed allocators? + * Checksum fixed allocators (needs to be tested on read back). * <p> - * Checksum delete blocks / blob records? + * Add version field to the fixed allocator. * <p> + * Done. Checksum delete blocks / blob records. + * <p> * PSOutputStream - remove caching logic. It is unused and makes this * class much more complex. A separate per-RWStore caching class for * recycling PSOutputStreams can be added later. @@ -204,6 +206,14 @@ * declare more fields to be final. See notes on {@link AllocBlock}. * <p> * Implement logic to "abort" a shadow allocation context. + * <p> + * Unit test to verify that we do not recycle allocations from the last + * commit point even when the retention time is zero such that it is + * always possible to re-open the store from the alternative root block + * even after you have allocated things against the current root block + * (but not yet committed). + * <p> + * Read-only mode. */ public class RWStore implements IStore { @@ -336,10 +346,18 @@ */ private final ArrayList<Allocator> m_allocs; - /** lists of free alloc blocks. */ + /** + * A fixed length array of lists of free {@link FixedAllocator}s with one + * entry in the array for each configured allocator size. An allocator is + * put onto this free list when it is initially created. When the store is + * opened, it will be added to this list if {@link Allocator#hasFree()} + * returns true. It will be removed when it has no free space remaining. It + * will be added back to the free list when its free slots exceeds a + * configured threshold. + */ private ArrayList<FixedAllocator> m_freeFixed[]; - /** lists of free blob allocators. */ +// /** lists of free blob allocators. */ // private final ArrayList<BlobAllocator> m_freeBlobs; /** lists of blocks requiring commitment. */ @@ -2039,7 +2057,7 @@ private int m_metaTransientBits[]; // volatile private int m_metaStartAddr; private volatile int m_metaBitsAddr; - + // @todo javadoc please. volatile private boolean m_recentAlloc = false; /** @@ -2437,6 +2455,20 @@ * Collected statistics are against each Allocation Block size: * total number of slots | store size * number of filled slots | store used + * <dl> + * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> + * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> + * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> + * <dt>%SlotWaste</dt><dd>BytesAppData/(SlotsInUse*AllocatorSize) : How well the application data fits in the slots.</dd> + * <dt>%StoreWaste</dt><dd>BytesAppData/BytesReserved : How much of the reserved space is in use by application data.</dd> + * <dt>%AppData</dt><dd>BytesAppData/Sum(BytesAppData). This is how much of your data is stored by each allocator.</dd> + * <dt>%BackingFile</dt><dd>BytesReserved/Sum(BytesReserved). This is how much of the backing file is reserved for each allocator.</dd> + * </dl> */ public void showAllocators(final StringBuilder str) { final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-18 14:32:50
|
Revision: 3957 http://bigdata.svn.sourceforge.net/bigdata/?rev=3957&view=rev Author: martyncutcher Date: 2010-11-18 14:32:43 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Integrate StorageStats collection Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 14:32:08 UTC (rev 3956) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 14:32:43 UTC (rev 3957) @@ -31,6 +31,7 @@ import org.apache.log4j.Logger; import com.bigdata.rwstore.RWStore.AllocationStats; +import com.bigdata.rwstore.StorageStats.Bucket; import com.bigdata.util.ChecksumUtility; /** @@ -52,6 +53,8 @@ */ volatile private int m_diskAddr; volatile private int m_index; + + Bucket m_statsBucket = null; public void setIndex(final int index) { final AllocBlock fb = (AllocBlock) m_allocBlocks.get(0); @@ -109,7 +112,7 @@ final int bit = offset % allocBlockRange; if (RWStore.tstBit(block.m_bits, bit)) { - return RWStore.convertAddr(block.m_addr) + ((long)m_size * bit); + return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; } @@ -428,6 +431,10 @@ } else { m_freeTransients++; } + + if (m_statsBucket != null) { + m_statsBucket.delete(size); + } return true; } else if (addr >= m_startAddr && addr < m_endAddr) { @@ -463,7 +470,11 @@ final AllocBlock block = iter.next(); if (block.m_addr == 0) { - int blockSize = 32 * m_bitSize * m_size; + int blockSize = 32 * m_bitSize; + if (m_statsBucket != null) { + m_statsBucket.addSlots(blockSize); + } + blockSize *= m_size; blockSize >>= RWStore.ALLOCATION_SCALEUP; block.m_addr = store.allocBlock(blockSize); @@ -498,6 +509,10 @@ addr += (count * 32 * m_bitSize); final int value = -((m_index << RWStore.OFFSET_BITS) + addr); + + if (m_statsBucket != null) { + m_statsBucket.allocate(size); + } return value; } else { @@ -651,4 +666,8 @@ return false; } } + + public void setBucketStats(Bucket b) { + m_statsBucket = b; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 14:32:08 UTC (rev 3956) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 14:32:43 UTC (rev 3957) @@ -344,7 +344,7 @@ * @todo examine concurrency and lock usage for {@link #m_alloc} and the * rest of these lists. */ - private final ArrayList<Allocator> m_allocs; + private final ArrayList<FixedAllocator> m_allocs; /** * A fixed length array of lists of free {@link FixedAllocator}s with one @@ -453,6 +453,12 @@ private volatile BufferedWrite m_bufferedWrite; + /** + * Our StoreageStats objects + */ + private StorageStats m_storageStats; + private long m_storageStatsAddr = 0; + /** * <code>true</code> iff the backing store is open. */ @@ -556,7 +562,7 @@ m_commitList = new ArrayList<Allocator>(); - m_allocs = new ArrayList<Allocator>(); + m_allocs = new ArrayList<FixedAllocator>(); // m_freeBlobs = new ArrayList<BlobAllocator>(); @@ -607,6 +613,8 @@ m_maxFixedAlloc = m_allocSizes[m_allocSizes.length-1]*64; m_minFixedAlloc = m_allocSizes[0]*64; + + m_storageStats = new StorageStats(m_allocSizes); // commitChanges(null); } else { @@ -615,6 +623,20 @@ m_maxFixedAlloc = m_allocSizes[m_allocSizes.length-1]*64; m_minFixedAlloc = m_allocSizes[0]*64; + + if (m_storageStatsAddr != 0) { + long statsAddr = m_storageStatsAddr >> 16; + int statsLen = ((int) m_storageStatsAddr) & 0xFFFF; + byte[] stats = new byte[statsLen + 4]; // allow for checksum + getData(statsAddr, stats); + DataInputStream instr = new DataInputStream(new ByteArrayInputStream(stats)); + m_storageStats = new StorageStats(instr); + + for (FixedAllocator fa: m_allocs) { + m_storageStats.register(fa); + } + } + } final int maxBlockLessChk = m_maxFixedAlloc-4; @@ -806,8 +828,8 @@ cDefaultMetaBitsSize = strBuf.readInt(); final int allocBlocks = strBuf.readInt(); - strBuf.readInt(); // reserved5 - strBuf.readInt(); // reserved6 + m_storageStatsAddr = strBuf.readLong(); + strBuf.readInt(); // reserved7 strBuf.readInt(); // reserved8 strBuf.readInt(); // reserved9 @@ -951,7 +973,7 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); final int allocSize = strBuf.readInt(); // if Blob < 0 - final Allocator allocator; + final FixedAllocator allocator; final ArrayList<? extends Allocator> freeList; assert allocSize > 0; @@ -970,6 +992,10 @@ allocator.setFreeList(freeList); m_allocs.add(allocator); + + if (m_storageStats != null) { + m_storageStats.register(allocator); + } } } @@ -1016,6 +1042,10 @@ m_allocs.add(allocator); + if (m_storageStats != null) { + m_storageStats.register(allocator); + } + return allocator; } else { return list.remove(0); @@ -1370,7 +1400,7 @@ } m_allocationLock.lock(); try { - if (sze > m_maxFixedAlloc) { + if (sze > m_maxFixedAlloc-4) { freeBlob(addr, sze, context); } else { final Allocator alloc = getBlockByAddress(addr); @@ -1402,7 +1432,11 @@ if (sze < (m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); - final int alloc = m_maxFixedAlloc-4; + if (m_storageStats != null) { + m_storageStats.deleteBlob(sze); + } + + final int alloc = m_maxFixedAlloc-4; final int blcks = (alloc - 1 + sze)/alloc; // read in header block, then free each reference @@ -1413,9 +1447,11 @@ new ByteArrayInputStream(hdr, 0, hdr.length-4) ); try { final int allocs = instr.readInt(); + int rem = sze; for (int i = 0; i < allocs; i++) { final int nxt = instr.readInt(); - free(nxt, m_maxFixedAlloc); + free(nxt, rem < alloc ? rem : alloc); + rem -= alloc; } free(hdr_addr, hdr.length); @@ -1439,6 +1475,9 @@ try { final Allocator alloc = getBlockByAddress(addr); final int addrOffset = getOffset(addr); + if (alloc == null) { + throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); + } final long pa = alloc.getPhysicalAddress(addrOffset); alloc.free(addr, sze); // must clear after free in case is a blobHdr that requires reading! @@ -1501,7 +1540,7 @@ m_allocationLock.lock(); try { try { - final Allocator allocator; + final FixedAllocator allocator; final int i = fixedAllocatorIndex(size); if (context != null) { allocator = establishContextAllocation(context).getFreeFixed(i); @@ -1520,6 +1559,10 @@ log.trace("New FixedAllocator for " + block); m_allocs.add(allocator); + + if (m_storageStats != null) { + m_storageStats.register(allocator, true); + } } else { // Verify free list only has allocators with free bits if (log.isDebugEnabled()){ @@ -1533,7 +1576,7 @@ tsti++; } } - allocator = (Allocator) list.get(0); + allocator = list.get(0); } } @@ -1611,6 +1654,10 @@ if (log.isTraceEnabled()) log.trace("BLOB ALLOC: " + size); + + if (m_storageStats != null) { + m_storageStats.allocateBlob(size); + } final PSOutputStream psout = PSOutputStream.getNew(this, m_maxFixedAlloc, context); @@ -1802,10 +1849,9 @@ str.writeInt(cVersion); str.writeLong(m_lastDeferredReleaseTime); str.writeInt(cDefaultMetaBitsSize); - str.writeInt(m_allocSizes.length); - - str.writeInt(0); // reserved5 - str.writeInt(0); // reserved6 + str.writeInt(m_allocSizes.length); + str.writeLong(m_storageStatsAddr); + str.writeInt(0); // reserved7 str.writeInt(0); // reserved8 str.writeInt(0); // reserved9 @@ -1849,7 +1895,19 @@ try { checkDeferredFrees(true, journal); // free now if possible - + + // free old storageStatsAddr + if (m_storageStatsAddr != 0) { + int len = (int) (m_storageStatsAddr & 0xFFFF); + int addr = (int) (m_storageStatsAddr >> 16); + immediateFree(addr, len); + } + if (m_storageStats != null) { + byte[] buf = m_storageStats.getData(); + long addr = alloc(buf, buf.length, null); + m_storageStatsAddr = (addr << 16) + buf.length; + } + // Allocate storage for metaBits final long oldMetaBits = m_metaBitsAddr; final int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; @@ -1859,9 +1917,13 @@ if (physicalAddress(m_metaBitsAddr) == 0) { throw new IllegalStateException("Returned MetaBits Address not valid!"); } + + // TODO: assert that m_deferredFreeOut is empty! + assert m_deferredFreeOut.getBytesWritten() == 0; // Call immediateFree - no need to defer freeof metaBits, this // has to stop somewhere! + // No more allocations must be made immediateFree((int) oldMetaBits, oldMetaBitsSize); // save allocation headers @@ -1907,7 +1969,8 @@ // m_commitCallback.commitComplete(); // } - m_reopener.reopenChannel().force(false); // TODO, check if required! + // The Journal handles the force in doubleSync + // m_reopener.reopenChannel().force(false); // TODO, check if required! } catch (IOException e) { throw new StorageTerminalError("Unable to commit transaction", e); } finally { @@ -2476,7 +2539,7 @@ stats[i] = new AllocationStats(m_allocSizes[i]*64); } - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { Allocator alloc = (Allocator) allocs.next(); alloc.appendShortStats(str, stats); @@ -2612,7 +2675,7 @@ return getBlock(addr); } - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); Allocator alloc = null; while (allocs.hasNext()) { @@ -3044,7 +3107,7 @@ */ public int getFixedAllocatorCount() { int fixed = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { if (allocs.next() instanceof FixedAllocator) { fixed++; @@ -3059,7 +3122,7 @@ */ public int getAllocatedBlocks() { int allocated = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { final Allocator alloc = allocs.next(); if (alloc instanceof FixedAllocator) { @@ -3075,12 +3138,10 @@ */ public long getFileStorage() { long allocated = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { - final Allocator alloc = allocs.next(); - if (alloc instanceof FixedAllocator) { - allocated += ((FixedAllocator) alloc).getFileStorage(); - } + final FixedAllocator alloc = allocs.next(); + allocated += ((FixedAllocator) alloc).getFileStorage(); } return allocated; @@ -3093,7 +3154,7 @@ */ public long getAllocatedSlots() { long allocated = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { final Allocator alloc = allocs.next(); if (alloc instanceof FixedAllocator) { @@ -3221,7 +3282,8 @@ immediateFree(-nxtAddr, bloblen); } else { - immediateFree(nxtAddr, 0); // size ignored for FreeAllocators + // The lack of size messes with the stats + immediateFree(nxtAddr, 1); // size ignored for FixedAllocators } nxtAddr = strBuf.readInt(); @@ -4112,5 +4174,9 @@ public int getMaxBlobSize() { return m_maxBlobAllocSize-4; // allow for checksum } + + public StorageStats getStorageStats() { + return m_storageStats; + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 19:26:25
|
Revision: 3969 http://bigdata.svn.sourceforge.net/bigdata/?rev=3969&view=rev Author: thompsonbry Date: 2010-11-18 19:26:19 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Renamed m_bits to m_live to be in keeping with the wiki documentation. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-18 19:18:43 UTC (rev 3968) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-18 19:26:19 UTC (rev 3969) @@ -31,7 +31,7 @@ /** * Bit maps for an allocator. The allocator is a bit map managed as int[]s. * - * @todo change to make {@link #m_transients}, {@link #m_bits}, and + * @todo change to make {@link #m_transients}, {@link #m_live}, and * {@link #m_commit} final fields and then modify {@link FixedAllocator} * to use {@link System#arraycopy(Object, int, Object, int, int)} to copy * the data rather than cloning it. @@ -70,7 +70,7 @@ * Just the newly allocated bits. This will be copied onto {@link #m_commit} * when the current native transaction commits. */ - final int m_bits[]; + final int m_live[]; /** * All of the bits from the commit point on entry to the current native * transaction plus any newly allocated bits. @@ -86,7 +86,7 @@ // m_writeCache = cache; m_ints = bitSize; m_commit = new int[bitSize]; - m_bits = new int[bitSize]; + m_live = new int[bitSize]; m_transients = new int[bitSize]; } @@ -98,7 +98,7 @@ // Now check to see if it allocated final int bit = (addr - m_addr) / size; - return RWStore.tstBit(m_bits, bit); + return RWStore.tstBit(m_live, bit); } public boolean addressInRange(final int addr, final int size) { @@ -116,7 +116,7 @@ } public boolean freeBit(final int bit) { - if (!RWStore.tstBit(m_bits, bit)) { + if (!RWStore.tstBit(m_live, bit)) { throw new IllegalArgumentException("Freeing bit not set"); } @@ -129,7 +129,7 @@ * output to the file by removing any pending write to the now freed * address. On large transaction scopes this may be significant. */ - RWStore.clrBit(m_bits, bit); + RWStore.clrBit(m_live, bit); if (!RWStore.tstBit(m_commit, bit)) { RWStore.clrBit(m_transients, bit); @@ -162,7 +162,7 @@ final int bit = RWStore.fndBit(m_transients, m_ints); if (bit != -1) { - RWStore.setBit(m_bits, bit); + RWStore.setBit(m_live, bit); RWStore.setBit(m_transients, bit); return bit; @@ -173,7 +173,7 @@ public boolean hasFree() { for (int i = 0; i < m_ints; i++) { - if (m_bits[i] != 0xFFFFFFFF) { + if (m_live[i] != 0xFFFFFFFF) { return true; } } @@ -185,7 +185,7 @@ int total = m_ints * 32; int allocBits = 0; for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { + if (RWStore.tstBit(m_live, i)) { allocBits++; } } @@ -211,7 +211,7 @@ final int total = m_ints * 32; for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { + if (RWStore.tstBit(m_live, i)) { addrs.add(new Integer(rootAddr - i)); } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 19:18:43 UTC (rev 3968) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 19:26:19 UTC (rev 3969) @@ -61,7 +61,7 @@ if (log.isDebugEnabled()) log.debug("Restored index " + index + " with " + getStartAddr() - + "[" + fb.m_bits[0] + "] from " + m_diskAddr); + + "[" + fb.m_live[0] + "] from " + m_diskAddr); m_index = index; } @@ -111,7 +111,7 @@ final int bit = offset % allocBlockRange; - if (RWStore.tstBit(block.m_bits, bit)) { + if (RWStore.tstBit(block.m_live, bit)) { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; @@ -170,7 +170,7 @@ try { final AllocBlock fb = m_allocBlocks.get(0); if (log.isDebugEnabled()) - log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_bits[0]); + log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_live[0]); final byte[] buf = new byte[1024]; final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); try { @@ -182,11 +182,11 @@ str.writeInt(block.m_addr); for (int i = 0; i < m_bitSize; i++) { - str.writeInt(block.m_bits[i]); + str.writeInt(block.m_live[i]); } // if (!m_store.isSessionPreserved()) { - block.m_transients = block.m_bits.clone(); + block.m_transients = block.m_live.clone(); // } /** @@ -196,11 +196,11 @@ if (m_context != null) { assert block.m_saveCommit != null; - block.m_saveCommit = block.m_bits.clone(); + block.m_saveCommit = block.m_live.clone(); // } else if (m_store.isSessionPreserved()) { // block.m_commit = block.m_transients.clone(); } else { - block.m_commit = block.m_bits.clone(); + block.m_commit = block.m_live.clone(); } } // add checksum @@ -243,17 +243,17 @@ block.m_addr = str.readInt(); for (int i = 0; i < m_bitSize; i++) { - block.m_bits[i] = str.readInt(); + block.m_live[i] = str.readInt(); /** * Need to calc how many free blocks are available, minor * optimization by checking against either empty or full to * avoid scanning every bit unnecessarily **/ - if (block.m_bits[i] == 0) { // empty + if (block.m_live[i] == 0) { // empty m_freeBits += 32; - } else if (block.m_bits[i] != 0xFFFFFFFF) { // not full - final int anInt = block.m_bits[i]; + } else if (block.m_live[i] != 0xFFFFFFFF) { // not full + final int anInt = block.m_live[i]; for (int bit = 0; bit < 32; bit++) { if ((anInt & (1 << bit)) == 0) { m_freeBits++; @@ -262,8 +262,8 @@ } } - block.m_transients = (int[]) block.m_bits.clone(); - block.m_commit = (int[]) block.m_bits.clone(); + block.m_transients = (int[]) block.m_live.clone(); + block.m_commit = (int[]) block.m_live.clone(); if (m_startAddr == 0) { m_startAddr = block.m_addr; @@ -642,7 +642,7 @@ final int bit = offset % allocBlockRange; - return RWStore.tstBit(block.m_bits, bit); + return RWStore.tstBit(block.m_live, bit); } public boolean isCommitted(int offset) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-29 09:53:36
|
Revision: 3985 http://bigdata.svn.sourceforge.net/bigdata/?rev=3985&view=rev Author: martyncutcher Date: 2010-11-29 09:53:27 +0000 (Mon, 29 Nov 2010) Log Message: ----------- 1) Extend the metabits header. 2) Add more relevant data to the storage stats. 3) Enable more flexible allocations of contiguous reservations in preparation for direct buffer allocation. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-24 21:40:07 UTC (rev 3984) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-29 09:53:27 UTC (rev 3985) @@ -237,4 +237,43 @@ m_commit = m_saveCommit; m_saveCommit = null; } + + /** + * Must release allocations made by this allocator. + * + * The commit bits are the old transient bits, so any allocated bits + * set in live, but not in commit, were set within this context. + * + * The m_commit is the m_transients bits at the point of the + * link of the allocationContext with this allocator, bits set in m_live + * that are not set in m_commit, were made by this allocator for the + * aborted context. + * + * L 1100 0110 AC 0111 AB 0110 + * T 1100 1110 1111 1110 + * C 1100 1100 1110 1100 + */ + public void abortshadow() { + for (int i = 0; i < m_live.length; i++) { + m_live[i] &= m_commit[i]; + m_transients[i] = m_live[i] | m_saveCommit[i]; + } + m_commit = m_saveCommit; + } + + /** + * When a session is active, the transient bits do not equate to an ORing + * of the committed bits and the live bits, but rather an ORing of the live + * with all the committed bits since the start of the session. + * When the session is released, the state is restored to an ORing of the + * live and the committed, thus releasing slots for re-allocation. + */ + public void releaseSession() { + if (m_addr != 0) { // check active! + for (int i = 0; i < m_live.length; i++) { + m_transients[i] = m_live[i] | m_commit[i]; + } + } + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-24 21:40:07 UTC (rev 3984) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-29 09:53:27 UTC (rev 3985) @@ -42,8 +42,10 @@ public class FixedAllocator implements Allocator { private static final Logger log = Logger.getLogger(FixedAllocator.class); + + private final int cModAllocation = 1 << RWStore.ALLOCATION_SCALEUP; + private final int cMinAllocation = cModAllocation * 1; // must be multiple of cModAllocation -// final private RWWriteCacheService m_writeCache; volatile private int m_freeBits; volatile private int m_freeTransients; @@ -111,7 +113,9 @@ final int bit = offset % allocBlockRange; - if (RWStore.tstBit(block.m_live, bit)) { + if (RWStore.tstBit(block.m_live, bit) + || (this.m_sessionActive && RWStore.tstBit(block.m_transients, bit))) + { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; @@ -145,6 +149,14 @@ } volatile private IAllocationContext m_context; + + /** + * Indicates whether session protection has been used to protect + * store from re-allocating allocations reachable from read-only + * requests and concurrent transactions. + */ + private boolean m_sessionActive; + public void setAllocationContext(final IAllocationContext context) { if (context == null && m_context != null) { // restore commit bits in AllocBlocks @@ -161,6 +173,21 @@ } /** + * Unwinds the allocations made within the context and clears + */ + public void abortAllocationContext(final IAllocationContext context) { + if (context != null && m_context == context) { + // restore commit bits in AllocBlocks + for (AllocBlock allocBlock : m_allocBlocks) { + allocBlock.abortshadow(); + } + m_context = null; + } else { + throw new IllegalArgumentException(); + } + } + + /** * write called on commit, so this is the point when "transient frees" - the * freeing of previously committed memory can be made available since we * are creating a new commit point - the condition being that m_freeBits @@ -174,6 +201,8 @@ final byte[] buf = new byte[1024]; final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); try { + m_sessionActive = m_store.isSessionProtected(); + str.writeInt(m_size); final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); @@ -185,9 +214,9 @@ str.writeInt(block.m_live[i]); } -// if (!m_store.isSessionPreserved()) { + if (!m_sessionActive) { block.m_transients = block.m_live.clone(); -// } + } /** * If this allocator is shadowed then copy the new committed @@ -314,29 +343,12 @@ m_size = size; - /* - * For smaller allocations we'll allocate a larger span, this is needed - * to ensure the minimum allocation is large enough to guarantee - * a unique address for a BlobAllocator. - */ - if (m_size < 256) { - /* - * Note: 64 ints is 256 bytes is 2048 bits, so 2048 allocation - * slots. - */ - m_bitSize = 64; - } else { - /* - * Note: 32 ints is 128 bytes is 1024 bits, so 1024 allocation - * slots. - */ - m_bitSize = 32; - } + m_bitSize = calcBitSize(true, size, cMinAllocation, cModAllocation); // m_writeCache = cache; // number of blocks in this allocator, bitSize plus 1 for start address - final int numBlocks = 255 / (m_bitSize + 1); + final int numBlocks = 254 / (m_bitSize + 1); /* * Create AllocBlocks for this FixedAllocator, but do not allocate @@ -351,6 +363,82 @@ m_freeTransients = 0; m_freeBits = 32 * m_bitSize * numBlocks; } + + /** + * This determines the size of the reservation required in terms of + * the number of ints each holding bits for 32 slots. + * + * The minimum return value will be 1, for a single int holiding 32 bits. + * + * The maximum value will be the number of ints required to fill the minimum + * reservation. + * + * The minimum reservation will be some multiple of the + * address multiplier that allows alloction blocks to address large addresses + * with an INT32. For example, by setting a minimum reservation at 128K, the + * allocation blocks INT32 start address may be multiplied by 128K to provide + * a physical address. + * + * The minReserve must be a power of 2, eg 1K, 2k or 4K.. etc + * + * A standard minReserve of 16K is plenty big enough, enabling 32TB of + * addressable store. The logical maximum used store is calculated as the + * maximum fixed allocation size * MAX_INT. So a store with a maximum + * fixed slot size of 4K could only allocated 8TB. + * + * Since the allocation size must be MOD 0 the minReserve, the lower the + * minReserve the smaller the allocation may be required for larger + * slot sizes. + * + * Another consideration is file locality. In this case the emphasis is + * on larger contiguous areas to improve the likely locality of allocations + * made by a FixedAllocator. Here the addressability implied by the reserve + * is not an issue, and larger reserves are chosen to improve locality. The + * downside is a potential for more wasted space, but this + * reduces as the store size grows and in large stores (> 10GB) becomes + * insignificant. + * + * Therefore, if a FixedAllocator is to be used in a large store and + * locality needs to be optimised for SATA disk access then the minReserve + * should be high = say 128K, while if the allocator is tuned to ByteBuffer + * allocation, a minallocation of 8 to 16K is more suitable. + * + * A final consideration is allocator reference efficiency in the sense + * to maximise the amount of allocations that can be made. By this I mean + * just how close we can get to MAX_INT allocations. For example, if we + * allow for upto 8192 allocations from a single allocator, but in + * practice average closer to 4096 then the maximum number of allocations + * comes down from MAX_INT to MAX_INT/2. This is also a consideration when + * considering max fixed allocator size, since if we require a large number + * of Blobs this reduces the amount of "virtual" allocations by at least + * a factro of three for each blob (at least 2 fixed allocations for + * content and 1 more for the header). A variation on the current Blob + * implementation could include the header in the first allocation, thus + * reducing the minimum Blob allocations from 3 to 2, but the point still + * holds that too small a max fixed allocation could rmatically reduce the + * number of allocations that could be made. + * + * @param alloc the slot size to be managed + * @param minReserve the minimum reservation in bytes + * @return the size of the int array + */ + public static int calcBitSize(final boolean optDensity, final int alloc, final int minReserve, final int modAllocation) { + final int intAllocation = 32 * alloc; // min 32 bits + + // we need to find smallest number of ints * the intAllocation + // such that totalAllocation % minReserve is 0 + // example 6K intAllocation would need 8 ints for 48K for 16K min + // likewise a 24K intAllocation would require 2 ints + // if optimising for density set min ints to 8 + int nints = optDensity ? 8 : 1; + while ((nints * intAllocation) < minReserve) nints++; + + while ((nints * intAllocation) % modAllocation != 0) nints++; + + System.out.println("calcBitSize for " + alloc + " returns " + nints); + + return nints; + } public String getStats(final AtomicLong counter) { @@ -675,4 +763,14 @@ public void setBucketStats(Bucket b) { m_statsBucket = b; } + + public void releaseSession() { + if (this.m_sessionActive) { + if (log.isTraceEnabled()) + log.trace("Allocator: #" + m_index + " releasing session protection"); + for (AllocBlock ab : m_allocBlocks) { + ab.releaseSession(); + } + } + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-24 21:40:07 UTC (rev 3984) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-29 09:53:27 UTC (rev 3985) @@ -357,11 +357,6 @@ // + ", last alloc: " + precount); // } - if (log.isDebugEnabled()) - log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); - - // DO NOT USE BLOB ALLOCATOR - // addr = m_store.registerBlob(addr); // returns handle } catch (IOException e) { // e.printStackTrace(); throw new RuntimeException(e); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-24 21:40:07 UTC (rev 3984) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-29 09:53:27 UTC (rev 3985) @@ -477,7 +477,7 @@ * <code>true</code> iff the backing store is open. */ private volatile boolean m_open = true; - + class WriteCacheImpl extends WriteCache.FileChannelScatteredWriteCache { public WriteCacheImpl(final ByteBuffer buf, final boolean useChecksum, @@ -852,9 +852,10 @@ final int allocBlocks = strBuf.readInt(); m_storageStatsAddr = strBuf.readLong(); - strBuf.readInt(); // reserved7 - strBuf.readInt(); // reserved8 - strBuf.readInt(); // reserved9 + // and let's read in those reserved ints + for (int i = 0; i < cReservedMetaBits; i++) { + strBuf.readInt(); + } m_allocSizes = new int[allocBlocks]; for (int i = 0; i < allocBlocks; i++) { @@ -992,7 +993,8 @@ FileChannelUtility.readAll(m_reopener, ByteBuffer.wrap(buf), addr); - final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + final ByteArrayInputStream baBuf = new ByteArrayInputStream(buf); + final DataInputStream strBuf = new DataInputStream(baBuf); final int allocSize = strBuf.readInt(); // if Blob < 0 final FixedAllocator allocator; @@ -1009,6 +1011,14 @@ freeList = m_freeFixed[index]; allocator.read(strBuf); + final int chk = ChecksumUtility.getCHK().checksum(buf, + buf.length - baBuf.available()); + + int tstChk = strBuf.readInt(); + if (tstChk != chk) { + throw new IllegalStateException("FixedAllocator checksum error"); + } + allocator.setDiskAddr(i); // store bit, not physical // address! allocator.setFreeList(freeList); @@ -1420,7 +1430,7 @@ if (sze > m_maxFixedAlloc-4) { freeBlob(addr, sze, context); } else { - final Allocator alloc = getBlockByAddress(addr); + final FixedAllocator alloc = getBlockByAddress(addr); /* * There are a few conditions here. If the context owns the * allocator and the allocation was made by this context then it @@ -1429,22 +1439,29 @@ * AllocationContexts, in this situation, the free must ALWAYS * be deferred. * + * If the MIN_RELEASE_AGE is ZERO then we can protect allocations + * and read-only transactions with Session protection, avoiding + * the need to manage deferred frees. + * * FIXME We need unit tests when MIN_RELEASE_AGE is GT ZERO. * * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - boolean alwaysDefer = m_minReleaseAge > 0L - || m_activeTxCount > 0; - if (!alwaysDefer) - alwaysDefer = context == null && !m_contexts.isEmpty(); - if (alwaysDefer) - if (log.isDebugEnabled()) - log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); - if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { - deferFree(addr, sze); + if (m_minReleaseAge == 0) { + immediateFree(addr, sze); } else { - immediateFree(addr, sze); + boolean alwaysDefer = m_activeTxCount > 0; + if (!alwaysDefer) + alwaysDefer = context == null && !m_contexts.isEmpty(); + if (alwaysDefer) + if (log.isDebugEnabled()) + log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); + if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { + deferFree(addr, sze); + } else { + immediateFree(addr, sze); + } } } } finally { @@ -1452,7 +1469,31 @@ } } + + long getHistoryRetention() { + return m_minReleaseAge; + } + boolean isSessionProtected() { + return m_minReleaseAge == 0 && (m_activeTxCount > 0 || !m_contexts.isEmpty()); + } + + /** + * Sessions will only be used to protect transactions and read-only views + * when the m_minReleaseAge is no zero, otherwise the deferredFree + * approach will be used. + * + * When called, will call through to the Allocators to re-sync the + * transient bits with the committed and live. + */ + void releaseSessions() { + if (m_minReleaseAge == 0) { + for (FixedAllocator fa : m_allocs) { + fa.releaseSession(); + } + } + } + private boolean freeBlob(final int hdr_addr, final int sze, final IAllocationContext context) { if (sze < (m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); @@ -1498,7 +1539,7 @@ m_allocationLock.lock(); try { - final Allocator alloc = getBlockByAddress(addr); + final FixedAllocator alloc = getBlockByAddress(addr); final int addrOffset = getOffset(addr); if (alloc == null) { throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); @@ -1879,9 +1920,10 @@ str.writeInt(m_allocSizes.length); str.writeLong(m_storageStatsAddr); - str.writeInt(0); // reserved7 - str.writeInt(0); // reserved8 - str.writeInt(0); // reserved9 + // Let's reserve ourselves some space + for (int i = 0; i < cReservedMetaBits; i++) { + str.writeInt(0); + } for (int i = 0; i < m_allocSizes.length; i++) { str.writeInt(m_allocSizes[i]); @@ -2126,18 +2168,24 @@ final private int cVersion = 0x0400; /** + * cReservedMetaBits is the reserved space in the metaBits header + * to alloc for binary compatibility moving forward. + * + * If we need to add int values to the header we can do so and reduce the + * reservation by 1 each time + */ + final int cReservedMetaBits = 20; + + /** * MetaBits Header * 0 int version * 1-2 int[2] long deferredFree * 3 int defaultMetaBitsSize * 4 int length of allocation sizes - * 5 int reserved - * 6 int reserved - * 7 int reserved - * 8 int reserved - * 9 int reserved + * 5-6 int[2] storage stats addr + * + 20 reserved */ - final private int cMetaHdrFields = 10; + final private int cMetaHdrFields = 7 + cReservedMetaBits; /** * @see Options#META_BITS_SIZE */ @@ -2697,7 +2745,7 @@ if (addr > 0) { return addr & 0xFFFFFFE0; } else { - final Allocator allocator = getBlock(addr); + final FixedAllocator allocator = getBlock(addr); final int offset = getOffset(addr); final long laddr = allocator.getPhysicalAddress(offset); @@ -2709,14 +2757,14 @@ * handle dual address format, if addr is positive then it is the physical * address, so the Allocators must be searched. **/ - Allocator getBlockByAddress(final int addr) { + FixedAllocator getBlockByAddress(final int addr) { if (addr < 0) { return getBlock(addr); } final Iterator<FixedAllocator> allocs = m_allocs.iterator(); - Allocator alloc = null; + FixedAllocator alloc = null; while (allocs.hasNext()) { alloc = allocs.next(); @@ -2733,10 +2781,10 @@ // return (-addr) >>> OFFSET_BITS; // } - private Allocator getBlock(final int addr) { + private FixedAllocator getBlock(final int addr) { final int index = (-addr) >>> OFFSET_BITS; - return (Allocator) m_allocs.get(index); + return m_allocs.get(index); } private int getOffset(final int addr) { @@ -3422,6 +3470,29 @@ } } + /** + * The ContextAllocation object manages a freeList of associated allocators + * and an overall list of allocators. When the context is detached, all + * allocators must be released and any that has available capacity will be + * assigned to the global free lists. + * + * @param context + * The context to be released from all FixedAllocators. + */ + public void abortContext(final IAllocationContext context) { + assertOpen(); + m_allocationLock.lock(); + try { + final ContextAllocation alloc = m_contexts.remove(context); + + if (alloc != null) { + alloc.release(); + } + } finally { + m_allocationLock.unlock(); + } + } + /** * The ContextAllocation class manages a set of Allocators. * @@ -3487,6 +3558,24 @@ // freeBlobs.addAll(m_freeBlobs); } + void abort() { + final ArrayList<FixedAllocator> freeFixed[] = m_parent != null ? m_parent.m_freeFixed + : m_store.m_freeFixed; + + final IAllocationContext pcontext = m_parent == null ? null + : m_parent.m_context; + + for (FixedAllocator f : m_allFixed) { + f.abortAllocationContext(pcontext); + } + + for (int i = 0; i < m_freeFixed.length; i++) { + freeFixed[i].addAll(m_freeFixed[i]); + } + +// freeBlobs.addAll(m_freeBlobs); + } + FixedAllocator getFreeFixed(final int i) { final ArrayList<FixedAllocator> free = m_freeFixed[i]; if (free.size() == 0) { @@ -4242,6 +4331,10 @@ m_activeTxCount--; if(log.isInfoEnabled()) log.info("#activeTx="+m_activeTxCount); + + if (m_activeTxCount == 0) { + releaseSessions(); + } } finally { m_allocationLock.unlock(); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-11-24 21:40:07 UTC (rev 3984) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-11-29 09:53:27 UTC (rev 3985) @@ -31,6 +31,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; +import java.util.Formatter; /** * Maintains stats on the RWStore allocations, useful for tuning Allocator @@ -64,35 +65,63 @@ * */ public class StorageStats { + final int cVersion = 0x0100; + final int m_maxFixed; public class BlobBucket { final int m_size; + long m_allocationSize; long m_allocations; long m_deletes; + long m_deleteSize; public BlobBucket(final int size) { m_size = size; } public BlobBucket(DataInputStream instr) throws IOException { m_size = instr.readInt(); + m_allocationSize = instr.readLong(); m_allocations = instr.readLong(); + m_deleteSize = instr.readLong(); m_deletes = instr.readLong(); } public void write(DataOutputStream outstr) throws IOException { outstr.writeInt(m_size); + outstr.writeLong(m_allocationSize); outstr.writeLong(m_allocations); + outstr.writeLong(m_deleteSize); outstr.writeLong(m_deletes); } - public void delete() { + public void delete(int sze) { + m_deleteSize += sze; m_deletes++; } - public void allocate() { + public void allocate(int sze) { + m_allocationSize += sze; m_allocations++; } + public long active() { + return m_allocations - m_deletes; + } + public int meanAllocation() { + if (m_allocations == 0) + return 0; + return (int) (m_allocationSize / m_allocations); + } + public float churn() { + if (active() == 0) + return m_allocations; + + BigDecimal allocs = new BigDecimal(m_allocations); + BigDecimal used = new BigDecimal(active()); + + return allocs.divide(used, 2, RoundingMode.HALF_UP).floatValue(); + } } public class Bucket { + final int m_start; final int m_size; int m_allocators; long m_totalSlots; @@ -101,11 +130,13 @@ long m_sizeAllocations; long m_sizeDeletes; - public Bucket(final int size) { + public Bucket(final int size, final int startRange) { m_size = size; + m_start = startRange; } public Bucket(DataInputStream instr) throws IOException { m_size = instr.readInt(); + m_start = instr.readInt(); m_allocators = instr.readInt(); m_slotAllocations = instr.readLong(); m_slotDeletes = instr.readLong(); @@ -115,6 +146,7 @@ } public void write(DataOutputStream outstr) throws IOException { outstr.writeInt(m_size); + outstr.writeInt(m_start); outstr.writeInt(m_allocators); outstr.writeLong(m_slotAllocations); outstr.writeLong(m_slotDeletes); @@ -126,6 +158,12 @@ if (sze < 0) throw new IllegalArgumentException("delete requires positive size, got: " + sze); + if (m_size > 64 && sze < 64) { + // if called from deferFree then may not include size. If so then use + // average size of slots to date as best running estimate. + sze = meanAllocation(); + } + if (sze > m_size) { // sze = ((sze - 1 + m_maxFixed)/ m_maxFixed) * 4; // Blob header @@ -151,6 +189,10 @@ return m_slotAllocations - m_slotDeletes; } + public long emptySlots() { + return m_totalSlots - usedSlots(); + } + public long usedStore() { return m_sizeAllocations - m_sizeDeletes; } @@ -160,23 +202,21 @@ if (usedStore() == 0) return 0.0f; - BigDecimal size = new BigDecimal(m_size * usedSlots()); - BigDecimal store = new BigDecimal(100 * usedStore()); - store = store.divide(size, 2, RoundingMode.HALF_UP); - BigDecimal total = new BigDecimal(100); + BigDecimal size = new BigDecimal(reservedStore()); + BigDecimal store = new BigDecimal(100 * (reservedStore() - usedStore())); - return total.subtract(store).floatValue(); + return store.divide(size, 2, RoundingMode.HALF_UP).floatValue(); } - public float totalWaste() { + public float totalWaste(long total) { if (usedStore() == 0) return 0.0f; - BigDecimal size = new BigDecimal(m_size * m_totalSlots); - BigDecimal store = new BigDecimal(100 * usedStore()); - store = store.divide(size, 2, RoundingMode.HALF_UP); - BigDecimal total = new BigDecimal(100); + long slotWaste = reservedStore() - usedStore(); - return total.subtract(store).floatValue(); + BigDecimal localWaste = new BigDecimal(100 * slotWaste); + BigDecimal totalWaste = new BigDecimal(total); + + return localWaste.divide(totalWaste, 2, RoundingMode.HALF_UP).floatValue(); } public long reservedStore() { return m_size * m_totalSlots; @@ -184,6 +224,40 @@ public void addAlocator() { m_allocators++; } + public float slotChurn() { + // Handle case where we may have deleted all allocations + if (usedSlots() == 0) + return m_slotAllocations; + + BigDecimal allocs = new BigDecimal(m_slotAllocations); + BigDecimal used = new BigDecimal(usedSlots()); + + return allocs.divide(used, 2, RoundingMode.HALF_UP).floatValue(); + } + public float slotsUnused() { + BigDecimal used = new BigDecimal(100 * (m_totalSlots-usedSlots())); + BigDecimal total = new BigDecimal(m_totalSlots); + + return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); + } + public float percentAllocations(long totalAllocations) { + BigDecimal used = new BigDecimal(100 * m_slotAllocations); + BigDecimal total = new BigDecimal(totalAllocations); + + return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); + } + public float percentSlotsInuse(long totalInuse) { + BigDecimal used = new BigDecimal(100 * usedSlots()); + BigDecimal total = new BigDecimal(totalInuse); + + return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); + } + public int meanAllocation() { + if (m_slotAllocations == 0) + return 0; + + return (int) (m_sizeAllocations / m_slotAllocations); + } } final ArrayList<Bucket> m_buckets; @@ -199,8 +273,10 @@ */ public StorageStats(final int[] buckets) { m_buckets = new ArrayList<Bucket>(); + int prevLimit = 0; for (int i = 0; i < buckets.length; i++) { - m_buckets.add(new Bucket(buckets[i]*64)); // slot sizes are 64 multiples + m_buckets.add(new Bucket(buckets[i]*64, prevLimit)); // slot sizes are 64 multiples + prevLimit = buckets[i]*64; } // last fixed allocator needed to compute BlobBuckets m_maxFixed = m_buckets.get(buckets.length-1).m_size; @@ -223,6 +299,10 @@ * @throws IOException */ public StorageStats(final DataInputStream instr) throws IOException { + int version = instr.readInt(); + if (cVersion != version) { + throw new IllegalStateException("StorageStats object is wrong version"); + } m_buckets = new ArrayList<Bucket>(); int nbuckets = instr.readInt(); for (int i = 0; i < nbuckets; i++) { @@ -242,6 +322,8 @@ ByteArrayOutputStream outb = new ByteArrayOutputStream(); DataOutputStream outd = new DataOutputStream(outb); + outd.writeInt(cVersion); + outd.writeInt(m_buckets.size()); for (Bucket b : m_buckets) { @@ -266,14 +348,14 @@ m_blobAllocation += sze; // increment blob bucket - findBlobBucket(sze).allocate(); + findBlobBucket(sze).allocate(sze); } public void deleteBlob(int sze) { m_blobDeletion -= sze; // decrement blob bucket - findBlobBucket(sze).delete(); + findBlobBucket(sze).delete(sze); } private BlobBucket findBlobBucket(final int sze) { @@ -307,57 +389,88 @@ str.append("\n-------------------------\n"); str.append("RWStore Allocator Summary\n"); str.append("-------------------------\n"); - str.append(padRight("AllocatorSize", 16)); - str.append(padLeft("AllocatorCount", 16)); - str.append(padLeft("SlotsAllocated", 16)); - str.append(padLeft("SlotsRecycled", 16)); - str.append(padLeft("SlotsInUse", 16)); - str.append(padLeft("SlotsReserved", 16)); - str.append(padLeft("BytesReserved", 16)); - str.append(padLeft("BytesAppData", 16)); - str.append(padLeft("%SlotWaste", 16)); - str.append(padLeft("%StoreWaste", 16)); - str.append(padLeft("%AppData", 16)); - str.append(padLeft("%StoreFile", 16)); - str.append("\n"); + str.append(String.format("%-16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s %16s \n", + "AllocatorSize", + "AllocatorCount", + "SlotsAllocated", + "%SlotsAllocated", + "SlotsRecycled", + "SlotChurn", + "SlotsInUse", + "%SlotsInUse", + "MeanAllocation", + "SlotsReserved", + "%SlotsUnused", + "BytesReserved", + "BytesAppData", + "%SlotWaste", + "%AppData", + "%StoreFile", + "%TotalWaste", + "%FileWaste" + )); long totalAppData = 0; long totalFileStore = 0; + long totalAllocations = 0; + long totalInuse = 0; for (Bucket b: m_buckets) { totalAppData += b.usedStore(); totalFileStore += b.reservedStore(); + totalAllocations += b.m_slotAllocations; + totalInuse += b.usedSlots(); } + long totalWaste = totalFileStore - totalAppData; + for (Bucket b: m_buckets) { - str.append(padRight("" + b.m_size, 16)); - str.append(padLeft("" + b.m_allocators, 16)); - str.append(padLeft("" + b.m_slotAllocations, 16)); - str.append(padLeft("" + b.m_slotDeletes, 16)); - str.append(padLeft("" + b.usedSlots(), 16)); - str.append(padLeft("" + b.m_totalSlots, 16)); - str.append(padLeft("" + b.reservedStore(), 16)); - str.append(padLeft("" + b.usedStore(), 16)); - str.append(padLeft("" + b.slotWaste() + "%", 16)); - str.append(padLeft("" + b.totalWaste() + "%", 16)); - str.append(padLeft("" + dataPercent(b.usedStore(), totalAppData) + "%", 16)); - str.append(padLeft("" + dataPercent(b.reservedStore(), totalFileStore) + "%", 16)); - str.append("\n"); + str.append(String.format("%-16d %16d %16d %16.2f %16d %16.2f %16d %16.2f %16d %16d %16.2f %16d %16d %16.2f %16.2f %16.2f %16.2f %16.2f \n", + b.m_size, + b.m_allocators, + b.m_slotAllocations, + b.percentAllocations(totalAllocations), + b.m_slotDeletes, + b.slotChurn(), + b.usedSlots(), + b.percentSlotsInuse(totalInuse), + b.meanAllocation(), + b.m_totalSlots, + b.slotsUnused(), + b.reservedStore(), + b.usedStore(), + b.slotWaste(), + dataPercent(b.usedStore(), totalAppData), + dataPercent(b.reservedStore(), totalFileStore), + b.totalWaste(totalWaste), + b.totalWaste(totalFileStore) + )); } str.append("\n-------------------------\n"); str.append("BLOBS\n"); str.append("-------------------------\n"); - str.append(padRight("Bucket", 10)); - str.append(padLeft("Allocations", 12)); - str.append(padLeft("Deletes", 12)); - str.append(padLeft("Current", 12)); - str.append("\n"); + str.append(String.format("%-10s %12s %12s %12s %12s %12s %12s %12s %12s\n", + "Bucket(K)", + "Allocations", + "Allocated", + "Deletes", + "Deleted", + "Current", + "Data", + "Mean", + "Churn")); for (BlobBucket b: m_blobBuckets) { - str.append(padRight("" + (b.m_size/1024) + "K", 10)); - str.append(padLeft("" + b.m_allocations, 12)); - str.append(padLeft("" + b.m_deletes, 12)); - str.append(padLeft("" + (b.m_allocations - b.m_deletes), 12)); - str.append("\n"); + str.append(String.format("%-10d %12d %12d %12d %12d %12d %12d %12d %12.2f\n", + b.m_size/1024, + b.m_allocations, + b.m_allocationSize, + b.m_deletes, + b.m_deleteSize, + (b.m_allocations - b.m_deletes), + (b.m_allocationSize - b.m_deleteSize), + b.meanAllocation(), + b.churn() + )); } } @@ -368,32 +481,4 @@ return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } - - public static String padLeft(String str, int minlen) { - if (str.length() >= minlen) - return str; - - StringBuffer out = new StringBuffer(); - int pad = minlen - str.length(); - while (pad-- > 0) { - out.append(' '); - } - out.append(str); - - return out.toString(); - } - - public static String padRight(String str, int minlen) { - if (str.length() >= minlen) - return str; - - StringBuffer out = new StringBuffer(); - out.append(str); - int pad = minlen - str.length(); - while (pad-- > 0) { - out.append(' '); - } - - return out.toString(); - } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-14 17:06:24
|
Revision: 4008 http://bigdata.svn.sourceforge.net/bigdata/?rev=4008&view=rev Author: thompsonbry Date: 2010-12-14 17:06:17 +0000 (Tue, 14 Dec 2010) Log Message: ----------- This is a re-do on a commit to fix a problem with RWStore where running with a ZERO (0) retention window would improperly recycle records while there was an open transaction. I thought that this change set was already committed, but clearly it was not. There is also a change to StorageStats and RWStore#showAllocators() to fix a DivideByZero problem (in the former) and to use the StorageStats in place of the older RWStore's self-reporting capabilities. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 16:56:43 UTC (rev 4007) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 17:06:17 UTC (rev 4008) @@ -1448,21 +1448,35 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - if (m_minReleaseAge == 0) { - immediateFree(addr, sze); + boolean alwaysDefer = m_minReleaseAge > 0L + || m_activeTxCount > 0; + if (!alwaysDefer) + alwaysDefer = context == null && !m_contexts.isEmpty(); + if (alwaysDefer) + if (log.isDebugEnabled()) + log.debug("Should defer " + addr + " real: " + + physicalAddress(addr)); + if (alwaysDefer + || !alloc.canImmediatelyFree(addr, sze, context)) { + deferFree(addr, sze); } else { - boolean alwaysDefer = m_activeTxCount > 0; - if (!alwaysDefer) - alwaysDefer = context == null && !m_contexts.isEmpty(); - if (alwaysDefer) - if (log.isDebugEnabled()) - log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); - if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { - deferFree(addr, sze); - } else { - immediateFree(addr, sze); - } + immediateFree(addr, sze); } +// if (m_minReleaseAge == 0) { +// immediateFree(addr, sze); +// } else { +// boolean alwaysDefer = m_activeTxCount > 0; +// if (!alwaysDefer) +// alwaysDefer = context == null && !m_contexts.isEmpty(); +// if (alwaysDefer) +// if (log.isDebugEnabled()) +// log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); +// if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { +// deferFree(addr, sze); +// } else { +// immediateFree(addr, sze); +// } +// } } } finally { m_allocationLock.unlock(); @@ -2596,85 +2610,68 @@ long m_reservedSlots; long m_filledSlots; } + /** - * Utility debug outputing the allocator array, showing index, start - * address and alloc type/size - * - * Collected statistics are against each Allocation Block size: - * total number of slots | store size - * number of filled slots | store used - * <dl> - * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> - * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> - * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> - * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> - * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> - * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> - * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> - * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> - * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> - * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> - * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> - * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> - * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> - * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> - * </dl> + * Collected statistics are against each Allocation Block size. See + * {@link StorageStats#showStats(StringBuilder)} for details on the + * generated report. */ public void showAllocators(final StringBuilder str) { - final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; - for (int i = 0; i < stats.length; i++) { - stats[i] = new AllocationStats(m_allocSizes[i]*64); - } - - final Iterator<FixedAllocator> allocs = m_allocs.iterator(); - while (allocs.hasNext()) { - Allocator alloc = (Allocator) allocs.next(); - alloc.appendShortStats(str, stats); - } - - // Append Summary - str.append("\n-------------------------\n"); - str.append("RWStore Allocation Summary\n"); - str.append("-------------------------\n"); - str.append(padRight("Allocator", 10)); - str.append(padLeft("SlotsUsed", 12)); - str.append(padLeft("reserved", 12)); - str.append(padLeft("StoreUsed", 14)); - str.append(padLeft("reserved", 14)); - str.append(padLeft("Usage", 8)); - str.append(padLeft("Store", 8)); - str.append("\n"); - long treserved = 0; - long treservedSlots = 0; - long tfilled = 0; - long tfilledSlots = 0; - for (int i = 0; i < stats.length; i++) { - final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; - treserved += reserved; - treservedSlots += stats[i].m_reservedSlots; - final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; - tfilled += filled; - tfilledSlots += stats[i].m_filledSlots; - } - for (int i = 0; i < stats.length; i++) { - final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; - final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; - str.append(padRight("" + stats[i].m_blockSize, 10)); - str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); - str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); - str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); - str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); - str.append("\n"); - } - str.append("\n"); - - str.append(padRight("Totals", 10)); - str.append(padLeft("" + tfilledSlots, 12)); - str.append(padLeft("" + treservedSlots, 12)); - str.append(padLeft("" + tfilled, 14)); - str.append(padLeft("" + treserved, 14)); - str.append(padLeft("" + (treserved==0?0:(tfilled * 100 / treserved)) + "%", 8)); - str.append("\nFile size: " + convertAddr(m_fileSize) + "bytes\n"); + m_storageStats.showStats(str); +// final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; +// for (int i = 0; i < stats.length; i++) { +// stats[i] = new AllocationStats(m_allocSizes[i]*64); +// } +// +// final Iterator<FixedAllocator> allocs = m_allocs.iterator(); +// while (allocs.hasNext()) { +// Allocator alloc = (Allocator) allocs.next(); +// alloc.appendShortStats(str, stats); +// } +// +// // Append Summary +// str.append("\n-------------------------\n"); +// str.append("RWStore Allocation Summary\n"); +// str.append("-------------------------\n"); +// str.append(padRight("Allocator", 10)); +// str.append(padLeft("SlotsUsed", 12)); +// str.append(padLeft("reserved", 12)); +// str.append(padLeft("StoreUsed", 14)); +// str.append(padLeft("reserved", 14)); +// str.append(padLeft("Usage", 8)); +// str.append(padLeft("Store", 8)); +// str.append("\n"); +// long treserved = 0; +// long treservedSlots = 0; +// long tfilled = 0; +// long tfilledSlots = 0; +// for (int i = 0; i < stats.length; i++) { +// final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; +// treserved += reserved; +// treservedSlots += stats[i].m_reservedSlots; +// final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; +// tfilled += filled; +// tfilledSlots += stats[i].m_filledSlots; +// } +// for (int i = 0; i < stats.length; i++) { +// final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; +// final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; +// str.append(padRight("" + stats[i].m_blockSize, 10)); +// str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); +// str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); +// str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); +// str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); +// str.append("\n"); +// } +// str.append("\n"); +// +// str.append(padRight("Totals", 10)); +// str.append(padLeft("" + tfilledSlots, 12)); +// str.append(padLeft("" + treservedSlots, 12)); +// str.append(padLeft("" + tfilled, 14)); +// str.append(padLeft("" + treserved, 14)); +// str.append(padLeft("" + (treserved==0?0:(tfilled * 100 / treserved)) + "%", 8)); +// str.append("\nFile size: " + convertAddr(m_fileSize) + "bytes\n"); } private String padLeft(String str, int minlen) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-14 16:56:43 UTC (rev 4007) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-14 17:06:17 UTC (rev 4008) @@ -31,7 +31,6 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; -import java.util.Formatter; /** * Maintains stats on the RWStore allocations, useful for tuning Allocator @@ -204,7 +203,7 @@ BigDecimal size = new BigDecimal(reservedStore()); BigDecimal store = new BigDecimal(100 * (reservedStore() - usedStore())); - + if(store.signum()==0) return 0f; return store.divide(size, 2, RoundingMode.HALF_UP).floatValue(); } public float totalWaste(long total) { @@ -215,7 +214,7 @@ BigDecimal localWaste = new BigDecimal(100 * slotWaste); BigDecimal totalWaste = new BigDecimal(total); - + if(totalWaste.signum()==0) return 0f; return localWaste.divide(totalWaste, 2, RoundingMode.HALF_UP).floatValue(); } public long reservedStore() { @@ -231,25 +230,25 @@ BigDecimal allocs = new BigDecimal(m_slotAllocations); BigDecimal used = new BigDecimal(usedSlots()); - + if(used.signum()==0) return 0f; return allocs.divide(used, 2, RoundingMode.HALF_UP).floatValue(); } public float slotsUnused() { BigDecimal used = new BigDecimal(100 * (m_totalSlots-usedSlots())); BigDecimal total = new BigDecimal(m_totalSlots); - + if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentAllocations(long totalAllocations) { BigDecimal used = new BigDecimal(100 * m_slotAllocations); BigDecimal total = new BigDecimal(totalAllocations); - + if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentSlotsInuse(long totalInuse) { BigDecimal used = new BigDecimal(100 * usedSlots()); BigDecimal total = new BigDecimal(totalInuse); - + if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public int meanAllocation() { @@ -384,7 +383,40 @@ public void register(FixedAllocator alloc) { register(alloc, false); } - + + /** + * Collected statistics are against each Allocation Block size: + * <dl> + * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> + * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> + * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> + * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> + * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> + * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> + * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> + * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> + * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> + * </dl> + * + * @param str + * + * FIXME Javadoc edit - this has diverged from the comments above. Also, there + * is also a divideByZero which can appear (this has been fixed).<pre> + [java] Exception in thread "main" java.lang.ArithmeticException: / by zero + [java] at java.math.BigDecimal.divideAndRound(BigDecimal.java:1407) + [java] at java.math.BigDecimal.divide(BigDecimal.java:1381) + [java] at java.math.BigDecimal.divide(BigDecimal.java:1491) + [java] at com.bigdata.rwstore.StorageStats$Bucket.slotsUnused(StorageStats.java:240) + [java] at com.bigdata.rwstore.StorageStats.showStats(StorageStats.java:448) + [java] at com.bigdata.rwstore.RWStore.showAllocators(RWStore.java:2620) + [java] at com.bigdata.rdf.store.DataLoader.main(DataLoader.java:1415) + </pre> + */ public void showStats(StringBuilder str) { str.append("\n-------------------------\n"); str.append("RWStore Allocator Summary\n"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-17 15:05:12
|
Revision: 4012 http://bigdata.svn.sourceforge.net/bigdata/?rev=4012&view=rev Author: martyncutcher Date: 2010-12-17 15:05:06 +0000 (Fri, 17 Dec 2010) Log Message: ----------- Relax session address checking to allow read on any transient bit - either session protected or committed. Resolves problem reading committed CommitRecords from CommitRecordIndex Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-16 14:28:33 UTC (rev 4011) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-17 15:05:06 UTC (rev 4012) @@ -113,9 +113,14 @@ final int bit = offset % allocBlockRange; - if (RWStore.tstBit(block.m_live, bit) - || (m_sessionActive && RWStore.tstBit(block.m_transients, bit))) - { +// if (RWStore.tstBit(block.m_live, bit) +// || (m_sessionActive && RWStore.tstBit(block.m_transients, bit))) + /* + * Just check transients since there are case (eg CommitRecordIndex) + * where committed data is accessed even if has been marked as ready to + * be recycled after the next commit + */ + if (RWStore.tstBit(block.m_transients, bit)) { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-16 14:28:33 UTC (rev 4011) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-17 15:05:06 UTC (rev 4012) @@ -1253,9 +1253,9 @@ readLock.lock(); - assertOpen(); // check again after taking lock + try { + assertOpen(); // check again after taking lock - try { // length includes space for the checksum if (length > m_maxFixedAlloc) { try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-22 09:50:03
|
Revision: 4040 http://bigdata.svn.sourceforge.net/bigdata/?rev=4040&view=rev Author: martyncutcher Date: 2010-12-22 09:49:57 +0000 (Wed, 22 Dec 2010) Log Message: ----------- 1) Fixes freeList management associated with releaseSession 2) Fixes AllocationContext freelist associations with FixedAllocators Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-21 23:07:08 UTC (rev 4039) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-22 09:49:57 UTC (rev 4040) @@ -313,8 +313,11 @@ * but not in the recalculated transient. Tested with new &= ~old; * * @param cache + * @return the number of allocations released */ - public void releaseSession(RWWriteCacheService cache) { + public int releaseSession(RWWriteCacheService cache) { + int freebits = 0; + if (m_addr != 0) { // check active! for (int i = 0; i < m_live.length; i++) { int chkbits = m_transients[i]; @@ -332,11 +335,15 @@ log.trace("releasing address: " + clr); cache.clearWrite(clr); + + freebits++; } } } } } + + return freebits; } public String show() { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-21 23:07:08 UTC (rev 4039) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-22 09:49:57 UTC (rev 4040) @@ -517,21 +517,8 @@ if (((AllocBlock) m_allocBlocks.get(block)) .freeBit(offset % nbits, m_sessionActive && !overideSession)) { // bit adjust - // Only add back to the free list this is a DirectFixedAllocator - // or the freeBits exceed the cDefaultFreeBitsThreshold - // If a DirectFixedAllocator then also ensure it is added to the - // front of the free list - if (m_freeBits++ == 0 && this instanceof DirectFixedAllocator) { - m_freeWaiting = false; - m_freeList.add(0, this); - } else if (m_freeWaiting && m_freeBits == m_store.cDefaultFreeBitsThreshold) { - m_freeWaiting = false; - - if (log.isDebugEnabled()) - log.debug("Returning Allocator to FreeList - " + m_size); - - m_freeList.add(this); - } + m_freeBits++; + checkFreeList(); } else { m_freeTransients++; } @@ -557,6 +544,22 @@ return false; } + private void checkFreeList() { + if (m_freeWaiting) { + if (m_freeBits > 0 && this instanceof DirectFixedAllocator) { + m_freeWaiting = false; + m_freeList.add(0, this); + } else if (m_freeBits >= m_store.cDefaultFreeBitsThreshold) { + m_freeWaiting = false; + + if (log.isDebugEnabled()) + log.debug("Returning Allocator to FreeList - " + m_size); + + m_freeList.add(this); + } + } + } + /** * The introduction of IAllocationContexts has added some complexity to * the older concept of a free list. With AllocationContexts it is @@ -803,9 +806,17 @@ if (this.m_sessionActive) { if (log.isTraceEnabled()) log.trace("Allocator: #" + m_index + " releasing session protection"); + + int releasedAllocations = 0; for (AllocBlock ab : m_allocBlocks) { - ab.releaseSession(cache); + releasedAllocations += ab.releaseSession(cache); } + + m_freeBits += releasedAllocations; + m_freeTransients -= releasedAllocations; + + checkFreeList(); + } } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-21 23:07:08 UTC (rev 4039) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-22 09:49:57 UTC (rev 4040) @@ -3556,6 +3556,10 @@ } + /** + * Must return the shadowed allocators to the parent/global + * environment, resetting the freeList association. + */ void release() { final ArrayList<FixedAllocator> freeFixed[] = m_parent != null ? m_parent.m_freeFixed : m_store.m_freeFixed; @@ -3565,6 +3569,7 @@ for (FixedAllocator f : m_allFixed) { f.setAllocationContext(pcontext); + f.setFreeList(freeFixed[m_store.fixedAllocatorIndex(f.m_size)]); } for (int i = 0; i < m_freeFixed.length; i++) { @@ -3597,6 +3602,7 @@ if (free.size() == 0) { final FixedAllocator falloc = establishFixedAllocator(i); falloc.setAllocationContext(m_context); + falloc.setFreeList(free); free.add(falloc); m_allFixed.add(falloc); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |