From: <mar...@us...> - 2010-12-16 13:05:00
|
Revision: 4010 http://bigdata.svn.sourceforge.net/bigdata/?rev=4010&view=rev Author: martyncutcher Date: 2010-12-16 12:44:43 +0000 (Thu, 16 Dec 2010) Log Message: ----------- 1) Fixes problem on RWStore open where a FixedAllocator can be associated with an incorrect free list. 2) Fixes issue with session protection interaction with WriteCacheService. ReleaseSession now clears transient writes from cache. 3) Fixes problem with session protection with FixedAllocator freebit count not kept in sync. Clears reported exceptions from concurrent stress tests. 4) Added extra validation to RWStore allocation access, validating IO requests against slot sizes. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestConcurrentJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -992,6 +992,20 @@ } + case TemporaryRW: { + + /* + * Setup the buffer strategy. + */ + + _bufferStrategy = new RWStrategy(fileMetadata, quorum); + + this._rootBlock = fileMetadata.rootBlock; + + break; + + } + case Temporary: { /* @@ -2354,6 +2368,14 @@ * the store. */ final long commitRecordIndexAddr = _commitRecordIndex.writeCheckpoint(); + + /* + * DEBUG: The commitRecordIndexAddr should not be deleted, the + * call to lockAddress forces a runtime check protecting the address + */ + if (_bufferStrategy instanceof RWStrategy) { + ((RWStrategy) _bufferStrategy).lockAddress(commitRecordIndexAddr); + } if (quorum != null) { /* Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -139,6 +139,18 @@ /** * <p> + * A variant on the DiskRW backed by a temporary file. Options enable + * part of the store to be held with Direct ByteBuffers. A significant + * use case would be an in-memory store but with disk overflow if + * required. + * </p> + * + * @see RWStrategy + */ + TemporaryRW(false/* stable */, false/* fullyBuffered */,StoreTypeEnum.RW), + + /** + * <p> * A variant on the {@link #Disk} mode that is not restart-safe. This mode * is useful for all manners of temporary data with full concurrency control * and scales-up to very large temporary files. The backing file (if any) is Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -1262,14 +1262,14 @@ DiskOnlyStrategy(final long maximumExtent, final FileMetadata fileMetadata) { super(fileMetadata.extent, maximumExtent, fileMetadata.offsetBits, - fileMetadata.nextOffset, fileMetadata.bufferMode, + fileMetadata.nextOffset, fileMetadata.getBufferMode(), fileMetadata.readOnly); this.file = fileMetadata.file; this.fileMode = fileMetadata.fileMode; - this.temporaryStore = (fileMetadata.bufferMode==BufferMode.Temporary); + this.temporaryStore = (fileMetadata.getBufferMode()==BufferMode.Temporary); this.raf = fileMetadata.raf; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -802,6 +802,7 @@ log.info("Mapping file=" + file); buffer = opener.reopenChannel().map(FileChannel.MapMode.READ_WRITE, headerSize0, userExtent); break; + case TemporaryRW: case DiskRW: buffer = null; break; @@ -1514,4 +1515,8 @@ return getProperty(properties, name, defaultValue); } + public BufferMode getBufferMode() { + return bufferMode; + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -633,4 +633,14 @@ } + /** + * An assert oriented method that allows a finite number of addresses + * to be monitored to ensure it is not freed. + * + * @param addr - address to be locked + */ + public void lockAddress(final long addr) { + m_store.lockAddress(decodeAddr(addr)); + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -822,14 +822,14 @@ final Quorum<?, ?> quorum) { super(fileMetadata.extent, maximumExtent, fileMetadata.offsetBits, - fileMetadata.nextOffset, fileMetadata.bufferMode, + fileMetadata.nextOffset, fileMetadata.getBufferMode(), fileMetadata.readOnly); this.file = fileMetadata.file; this.fileMode = fileMetadata.fileMode; - this.temporaryStore = (fileMetadata.bufferMode==BufferMode.Temporary); + this.temporaryStore = (fileMetadata.getBufferMode()==BufferMode.Temporary); this.raf = fileMetadata.raf; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -26,6 +26,8 @@ import java.util.ArrayList; +import org.apache.log4j.Logger; + import com.bigdata.rwstore.RWStore.AllocationStats; /** @@ -43,6 +45,15 @@ * @todo change to use long[]s. */ public class AllocBlock { + + private static final Logger log = Logger.getLogger(AllocBlock.class); + + /** + * The FixedAllocator owning this block. The callback reference is needed + * to allow the AllocBlock to determine the session state and whether to + * clear the transient bits. + */ + final FixedAllocator m_allocator; /** * The address of the {@link AllocBlock} -or- ZERO (0) if {@link AllocBlock} * has not yet been allocated on the persistent heap. Note that the space @@ -82,8 +93,9 @@ // */ // private final RWWriteCacheService m_writeCache; - AllocBlock(final int addrIsUnused, final int bitSize) {//, final RWWriteCacheService cache) { + AllocBlock(final int addrIsUnused, final int bitSize, final FixedAllocator allocator) {//, final RWWriteCacheService cache) { // m_writeCache = cache; + m_allocator = allocator; m_ints = bitSize; m_commit = new int[bitSize]; m_live = new int[bitSize]; @@ -116,6 +128,16 @@ } public boolean freeBit(final int bit) { + // by default do NOT session protect, the 2 argument call is made + // directly from the RWStore that has access to sessio and transaction + // state + return freeBit(bit, false); + } + + /* + * + */ + public boolean freeBit(final int bit, final boolean sessionProtect) { if (!RWStore.tstBit(m_live, bit)) { throw new IllegalArgumentException("Freeing bit not set"); } @@ -128,17 +150,35 @@ * Note that with buffered IO there is also an opportunity to avoid * output to the file by removing any pending write to the now freed * address. On large transaction scopes this may be significant. + * + * The sessionProtect parameter indicates whether we really should + * continue to protect this alloction by leaving the transient bit + * set. For general session protection we should, BUT it allocation + * contexts have been used we can allow immediate recycling and this + * is setup by the caller */ RWStore.clrBit(m_live, bit); + + if (log.isTraceEnabled()) { + log.trace("Freeing " + bitPhysicalAddress(bit) + " sessionProtect: " + sessionProtect); + } - if (!RWStore.tstBit(m_commit, bit)) { - RWStore.clrBit(m_transients, bit); - - return true; + if (!sessionProtect) { + if (!RWStore.tstBit(m_commit, bit)) { + RWStore.clrBit(m_transients, bit); + + return true; + } else { + return false; + } } else { return false; } } + + private long bitPhysicalAddress(int bit) { + return RWStore.convertAddr(m_addr) + ((long) m_allocator.m_size * bit); + } /** * The shadow, if non-null defines the context for this request. @@ -266,14 +306,45 @@ * of the committed bits and the live bits, but rather an ORing of the live * with all the committed bits since the start of the session. * When the session is released, the state is restored to an ORing of the - * live and the committed, thus releasing slots for re-allocation. + * live and the committed, thus releasing slots for re-allocation. + * + * For each transient bit, check if cleared and ensure any write is removed + * from the write cache. Where the bit is set in the session protected + * but not in the recalculated transient. Tested with new &= ~old; + * + * @param cache */ - public void releaseSession() { + public void releaseSession(RWWriteCacheService cache) { if (m_addr != 0) { // check active! for (int i = 0; i < m_live.length; i++) { + int chkbits = m_transients[i]; m_transients[i] = m_live[i] | m_commit[i]; + chkbits &= ~m_transients[i]; + + if (chkbits != 0) { + // there are writes to clear + for (int b = 0; b < 32; b++) { + if ((chkbits & (1 << b)) != 0) { + long clr = RWStore.convertAddr(m_addr) + ((long) m_allocator.m_size * b); + + if (log.isTraceEnabled()) + log.trace("releasing address: " + clr); + + cache.clearWrite(clr); + } + } + } } } } + public String show() { + StringBuilder sb = new StringBuilder(); + sb.append("AllocBlock, baseAddress: " + RWStore.convertAddr(m_addr) + " bits: "); + for (int b: m_transients) + sb.append(b + " "); + + return sb.toString(); + } + } Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -0,0 +1,18 @@ +package com.bigdata.rwstore; + +/** + * The DirectFixedAllocator is used to manage in-memory Direct ByteBuffer + * allocated memory. + * + */ +public class DirectFixedAllocator extends FixedAllocator { + + DirectFixedAllocator(RWStore store, int size) { + super(store, size); + } + + protected int grabAllocation(RWStore store, int blockSize) { + return store.allocateDirect(blockSize); + } + +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -114,7 +114,7 @@ final int bit = offset % allocBlockRange; if (RWStore.tstBit(block.m_live, bit) - || (this.m_sessionActive && RWStore.tstBit(block.m_transients, bit))) + || (m_sessionActive && RWStore.tstBit(block.m_transients, bit))) { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { @@ -155,7 +155,7 @@ * store from re-allocating allocations reachable from read-only * requests and concurrent transactions. */ - private boolean m_sessionActive; + boolean m_sessionActive; public void setAllocationContext(final IAllocationContext context) { if (context == null && m_context != null) { @@ -196,13 +196,11 @@ public byte[] write() { try { final AllocBlock fb = m_allocBlocks.get(0); - if (log.isDebugEnabled()) - log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_live[0]); + if (log.isTraceEnabled()) + log.trace("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_live[0]); final byte[] buf = new byte[1024]; final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); try { - m_sessionActive = m_store.isSessionProtected(); - str.writeInt(m_size); final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); @@ -240,19 +238,19 @@ str.close(); } -// if (!m_store.isSessionPreserved()) { - m_freeBits += m_freeTransients; - - // Handle re-addition to free list once transient frees are - // added back - if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { - m_freeList.add(this); - m_freeWaiting = false; + if (!this.m_sessionActive) { + m_freeBits += m_freeTransients; + + // Handle re-addition to free list once transient frees are + // added back + if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { + m_freeList.add(this); + m_freeWaiting = false; + } + + m_freeTransients = 0; } - m_freeTransients = 0; -// } - return buf; } catch (IOException e) { throw new StorageTerminalError("Error on write", e); @@ -309,7 +307,7 @@ } /** The size of the allocation slots in bytes. */ - private final int m_size; + final int m_size; private int m_startAddr = 0; private int m_endAddr = 0; @@ -343,11 +341,13 @@ m_size = size; - m_bitSize = calcBitSize(true, size, cMinAllocation, cModAllocation); + // By default, disk-based allocators should optimise for density + m_bitSize = calcBitSize(true /* optDensity */, size, cMinAllocation, cModAllocation); -// m_writeCache = cache; - // number of blocks in this allocator, bitSize plus 1 for start address + // The 1K allocator is 256 ints, one is used to record the slot size and + // another for the checksum; leaving 254 to be used to store the + // AllocBlocks. final int numBlocks = 254 / (m_bitSize + 1); /* @@ -357,7 +357,7 @@ */ m_allocBlocks = new ArrayList<AllocBlock>(numBlocks); for (int i = 0; i < numBlocks; i++) { - m_allocBlocks.add(new AllocBlock(0, m_bitSize));//, cache)); + m_allocBlocks.add(new AllocBlock(0, m_bitSize, this));//, cache)); } m_freeTransients = 0; @@ -415,7 +415,7 @@ * content and 1 more for the header). A variation on the current Blob * implementation could include the header in the first allocation, thus * reducing the minimum Blob allocations from 3 to 2, but the point still - * holds that too small a max fixed allocation could rmatically reduce the + * holds that too small a max fixed allocation could dramatically reduce the * number of allocations that could be made. * * @param alloc the slot size to be managed @@ -435,8 +435,6 @@ while ((nints * intAllocation) % modAllocation != 0) nints++; -// System.out.println("calcBitSize for " + alloc + " returns " + nints); - return nints; } @@ -498,6 +496,10 @@ private boolean m_freeWaiting = true; public boolean free(final int addr, final int size) { + return free(addr, size, false); + } + + public boolean free(final int addr, final int size, final boolean overideSession) { if (addr < 0) { final int offset = ((-addr) & RWStore.OFFSET_BITS_MASK) - 3; // bit adjust @@ -505,15 +507,24 @@ final int block = offset/nbits; + m_sessionActive = m_store.isSessionProtected(); + if (((AllocBlock) m_allocBlocks.get(block)) - .freeBit(offset % nbits)) { // bit adjust + .freeBit(offset % nbits, m_sessionActive && !overideSession)) { // bit adjust - // Only add back to the free list if at least 3000 bits avail - if (m_freeBits++ == 0 && false) { + // Only add back to the free list this is a DirectFixedAllocator + // or the freeBits exceed the cDefaultFreeBitsThreshold + // If a DirectFixedAllocator then also ensure it is added to the + // front of the free list + if (m_freeBits++ == 0 && this instanceof DirectFixedAllocator) { m_freeWaiting = false; - m_freeList.add(this); + m_freeList.add(0, this); } else if (m_freeWaiting && m_freeBits == m_store.cDefaultFreeBitsThreshold) { m_freeWaiting = false; + + if (log.isDebugEnabled()) + log.debug("Returning Allocator to FreeList - " + m_size); + m_freeList.add(this); } } else { @@ -554,6 +565,11 @@ throw new IllegalArgumentException( "Allocate requires positive size, got: " + size); + if (size > m_size) + throw new IllegalArgumentException( + "FixedAllocator with slots of " + m_size + + " bytes requested allocation for "+ size + " bytes"); + int addr = -1; final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); @@ -570,9 +586,9 @@ blockSize *= m_size; blockSize >>= RWStore.ALLOCATION_SCALEUP; - block.m_addr = store.allocBlock(blockSize); - if (log.isInfoEnabled()) - log.info("Allocation block at " + block.m_addr + " of " + (blockSize << 16) + " bytes"); + block.m_addr = grabAllocation(store, blockSize); + if (log.isDebugEnabled()) + log.debug("Allocation block at " + block.m_addr + " of " + (blockSize << 16) + " bytes"); if (m_startAddr == 0) { m_startAddr = block.m_addr; @@ -583,8 +599,9 @@ } if (addr != -1) { - addr += 3; // Tweak to ensure non-zero address for offset 0 + addr += 3; // Tweak to ensure non-zero address for offset 0 + if (--m_freeBits == 0) { if (log.isTraceEnabled()) log.trace("Remove from free list"); @@ -593,9 +610,10 @@ // Should have been first on list, now check for first if (m_freeList.size() > 0) { - final FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); - if (log.isInfoEnabled()) - log.info("Freelist head: " + nxt.getSummaryStats()); + if (log.isDebugEnabled()) { + final FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); + log.debug("Freelist head: " + nxt.getSummaryStats()); + } } } @@ -606,16 +624,28 @@ if (m_statsBucket != null) { m_statsBucket.allocate(size); } - + return value; } else { - if (log.isTraceEnabled()) - log.trace("FixedAllocator returning null address"); - + if (log.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("FixedAllocator returning null address, with freeBits: " + m_freeBits + "\n"); + + for (AllocBlock ab: m_allocBlocks) { + sb.append(ab.show() + "\n"); + } + + log.debug(sb); + } + return 0; } } + protected int grabAllocation(RWStore store, int blockSize) { + return store.allocBlock(blockSize); + } + public boolean hasFree() { return m_freeBits > 0; } @@ -764,12 +794,12 @@ m_statsBucket = b; } - public void releaseSession() { + public void releaseSession(RWWriteCacheService cache) { if (this.m_sessionActive) { if (log.isTraceEnabled()) log.trace("Allocator: #" + m_index + " releasing session protection"); for (AllocBlock ab : m_allocBlocks) { - ab.releaseSession(); + ab.releaseSession(cache); } } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -77,6 +77,12 @@ */ public void getData(long l, byte buf[]); + /************************************************************** + * @param addr - the address + * @return the size of the slot associated + */ + public int getAssociatedSlotSize(int addr); + // /************************************************************** // * Given a physical address (byte offset on the store), return true // * if that address could be managed by an allocated block. Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -37,6 +37,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -54,6 +56,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; import com.bigdata.counters.striped.StripedCounters; +import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.BufferedWrite; @@ -61,6 +64,7 @@ import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.journal.AbstractBufferStrategy; import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.BufferMode; import com.bigdata.journal.CommitRecordIndex; import com.bigdata.journal.CommitRecordSerializer; import com.bigdata.journal.FileMetadata; @@ -194,9 +198,9 @@ * <p> * Add metabits header record checksum field and verify on read back. * <p> - * Checksum fixed allocators (needs to be tested on read back). + * Done. Checksum fixed allocators (needs to be tested on read back). * <p> - * Add version field to the fixed allocator. + * Done. Add version field to the fixed allocator. * <p> * Done. Checksum delete blocks / blob records. * <p> @@ -207,7 +211,7 @@ * Modify FixedAllocator to use arrayCopy() rather than clone and * declare more fields to be final. See notes on {@link AllocBlock}. * <p> - * Implement logic to "abort" a shadow allocation context. + * Done. Implement logic to "abort" a shadow allocation context. * <p> * Unit test to verify that we do not recycle allocations from the last * commit point even when the retention time is zero such that it is @@ -335,6 +339,12 @@ // m_commitCallback = callback; // } + // If required, then allocate 1M direct buffers + private static final int cDirectBufferCapacity = 1024 * 1024; + + private int cMaxDirectBuffers = 20; // 20M of direct buffers + static final int cDirectAllocationOffset = 64 * 1024; + // /////////////////////////////////////////////////////////////////////////////////////// // RWStore Data // /////////////////////////////////////////////////////////////////////////////////////// @@ -483,11 +493,26 @@ private StorageStats m_storageStats; private long m_storageStatsAddr = 0; + /** + * Direct ByteBuffer allocations. + * + * TODO: Support different scaleups for disk and direct allocation to + * allow for finer granularity of allocation. For example, a 1K + * scaleup would allow 32bit slot allocations for all slot sizes. + */ + private int m_directSpaceAvailable = 0; + private int m_nextDirectAllocation = cDirectAllocationOffset; + private ArrayList<ByteBuffer> m_directBuffers = null; + + private final boolean m_enableDirectBuffer; + /** * <code>true</code> iff the backing store is open. */ private volatile boolean m_open = true; + private TreeMap<Integer, Integer> m_lockAddresses = null; + class WriteCacheImpl extends WriteCache.FileChannelScatteredWriteCache { public WriteCacheImpl(final ByteBuffer buf, final boolean useChecksum, @@ -549,13 +574,23 @@ if (fileMetadata == null) throw new IllegalArgumentException(); - this.m_minReleaseAge = LongValidator.GTE_ZERO.parse( + this.m_minReleaseAge = Long.valueOf(fileMetadata.getProperty( AbstractTransactionService.Options.MIN_RELEASE_AGE, - AbstractTransactionService.Options.DEFAULT_MIN_RELEASE_AGE); + AbstractTransactionService.Options.DEFAULT_MIN_RELEASE_AGE)); if (log.isInfoEnabled()) log.info(AbstractTransactionService.Options.MIN_RELEASE_AGE + "=" + m_minReleaseAge); + /* + * Disable TemporaryRW option for now + */ + // m_enableDirectBuffer = fileMetadata.getBufferMode() == BufferMode.TemporaryRW; + m_enableDirectBuffer = false; + + if (m_enableDirectBuffer) { + m_directBuffers = new ArrayList<ByteBuffer>(); + addDirectBuffer(); + } cDefaultMetaBitsSize = Integer.valueOf(fileMetadata.getProperty( Options.META_BITS_SIZE, @@ -673,6 +708,8 @@ for (FixedAllocator fa: m_allocs) { m_storageStats.register(fa); } + } else { + m_storageStats = new StorageStats(m_allocSizes); } } @@ -694,7 +731,15 @@ } } - private void setAllocations(final FileMetadata fileMetadata) + private void addDirectBuffer() { + if (cMaxDirectBuffers > m_directBuffers.size()) { + ByteBuffer bbuf = ByteBuffer.allocateDirect(cDirectBufferCapacity); + m_directBuffers.add(bbuf); + m_directSpaceAvailable += cDirectBufferCapacity; + } + } + + private void setAllocations(final FileMetadata fileMetadata) throws IOException { final String buckets = fileMetadata.getProperty( @@ -1017,11 +1062,20 @@ final ArrayList<? extends Allocator> freeList; assert allocSize > 0; + // m_minFixedAlloc and m_maxFixedAlloc may not be set since + // as finals they must be set in the constructor. Therefore + // recalculate for local load + final int minFixedAlloc = 64 * m_allocSizes[0]; + final int maxFixedAlloc = 64 * m_allocSizes[m_allocSizes.length-1]; int index = 0; - int fixedSize = m_minFixedAlloc; - while (fixedSize < allocSize) + int fixedSize = minFixedAlloc; + while (fixedSize < allocSize && fixedSize < maxFixedAlloc) fixedSize = 64 * m_allocSizes[++index]; + if (allocSize != fixedSize) { + throw new IllegalStateException("Unexpected allocator size: " + + allocSize + " != " + fixedSize); + } allocator = new FixedAllocator(this, allocSize);//, m_writeCache); freeList = m_freeFixed[index]; @@ -1056,13 +1110,6 @@ for (int index = 0; index < m_allocs.size(); index++) { ((Allocator) m_allocs.get(index)).setIndex(index); } - - if (false) { - StringBuilder tmp = new StringBuilder(); - showAllocators(tmp); - - System.out.println("Allocators: " + tmp.toString()); - } } /** @@ -1206,6 +1253,8 @@ readLock.lock(); + assertOpen(); // check again after taking lock + try { // length includes space for the checksum if (length > m_maxFixedAlloc) { @@ -1277,6 +1326,10 @@ } try { + + if (getBlock((int) addr).getBlockSize() < length) { + throw new IllegalStateException("Bad Address: length requested greater than allocated slot"); + } final long paddr = physicalAddress((int) addr); @@ -1287,6 +1340,12 @@ throw new PhysicalAddressResolutionException(addr); } + + if (paddr < 0) { // read from Direct ByteBuffer + directRead(paddr, buf, offset, length); + + return; + } /** * Check WriteCache first @@ -1382,6 +1441,69 @@ } } + /** + * Retrieves data from the direct byte buffers, must handle transfers across + * multiple buffers + */ + private void directRead(final long paddr, final byte[] buf, final int offset, final int length) { + assert paddr < 0; + assert m_directBuffers != null; + + final int baddr = (int) (-paddr) - cDirectAllocationOffset; // buffer address + int bufIndex = baddr / cDirectBufferCapacity; + int bufOffset = baddr % cDirectBufferCapacity; + + int transfer = 0; + int curOut = offset; + + while (transfer < length) { + ByteBuffer direct = m_directBuffers.get(bufIndex); + direct.position(bufOffset); + int avail = cDirectBufferCapacity - bufOffset; + int req = length - transfer; + int tlen = avail < req ? avail : req; + + direct.get(buf, curOut, tlen); + + transfer += tlen; + curOut += tlen; + + bufIndex++; + bufOffset = 0; + } + } + + /** + * Writes to direct buffers, transferring across boundaries as required + */ + private void directWrite(final long pa, final byte[] buf, final int offset, final int length, final int chk) { + assert pa < 0; + assert m_directBuffers != null; + + final int baddr = (int) (-pa) - cDirectAllocationOffset; // buffer address + int bufIndex = baddr / cDirectBufferCapacity; + int bufOffset = baddr % cDirectBufferCapacity; + + int transfer = 0; + int curIn = offset; + + while (transfer < length) { + ByteBuffer direct = m_directBuffers.get(bufIndex); + direct.position(bufOffset); + int avail = cDirectBufferCapacity - bufOffset; + int req = length - transfer; + int tlen = avail < req ? avail : req; + + direct.put(buf, curIn, tlen); + + transfer += tlen; + curIn += tlen; + + bufIndex++; + bufOffset = 0; + } + } + private void assertAllocators() { for (int i = 0; i < m_allocs.size(); i++) { if (m_allocs.get(i).getIndex() != i) { @@ -1434,7 +1556,7 @@ public void free(final long laddr, final int sze, final IAllocationContext context) { assertOpen(); final int addr = (int) laddr; - + switch (addr) { case 0: case -1: @@ -1443,6 +1565,9 @@ } m_allocationLock.lock(); try { + if (m_lockAddresses != null && m_lockAddresses.containsKey((int)laddr)) + throw new IllegalStateException("address locked: " + laddr); + if (sze > m_maxFixedAlloc-4) { freeBlob(addr, sze, context); } else { @@ -1464,35 +1589,32 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - boolean alwaysDefer = m_minReleaseAge > 0L - || m_activeTxCount > 0; - if (!alwaysDefer) - alwaysDefer = context == null && !m_contexts.isEmpty(); - if (alwaysDefer) - if (log.isDebugEnabled()) - log.debug("Should defer " + addr + " real: " - + physicalAddress(addr)); - if (alwaysDefer - || !alloc.canImmediatelyFree(addr, sze, context)) { - deferFree(addr, sze); + if (m_minReleaseAge == 0) { + /* + * The session protection is complicated by the mix of + * transaction protection and isolated AllocationContexts. + */ + if (this.isSessionProtected()) { + + immediateFree(addr, sze, context != null && alloc.canImmediatelyFree(addr, sze, context)); + } else { + immediateFree(addr, sze); + } } else { - immediateFree(addr, sze); + boolean alwaysDefer = m_activeTxCount > 0; + + if (!alwaysDefer) + alwaysDefer = context == null && !m_contexts.isEmpty(); + + if (alwaysDefer) + if (log.isDebugEnabled()) + log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); + if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { + deferFree(addr, sze); + } else { + immediateFree(addr, sze); + } } -// if (m_minReleaseAge == 0) { -// immediateFree(addr, sze); -// } else { -// boolean alwaysDefer = m_activeTxCount > 0; -// if (!alwaysDefer) -// alwaysDefer = context == null && !m_contexts.isEmpty(); -// if (alwaysDefer) -// if (log.isDebugEnabled()) -// log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); -// if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { -// deferFree(addr, sze); -// } else { -// immediateFree(addr, sze); -// } -// } } } finally { m_allocationLock.unlock(); @@ -1504,6 +1626,19 @@ return m_minReleaseAge; } + /** + * Session protection can only be used in preference to deferred frees when + * the minReleaseAge is zero. If so then two protection states are checked: + * either a positive activeTxCount incremented by the TransactionManager + * or if there are active AllocationContexts. + * + * The activeTxCount esentially protects read-only transactions while the + * AllocationContexts enable concurrent store allocations, whilst also + * supporting immediate re-cycling of localized allocations (those made + * and released within the same AllocationContext). + * + * @return whether there is a logical active session + */ boolean isSessionProtected() { return m_minReleaseAge == 0 && (m_activeTxCount > 0 || !m_contexts.isEmpty()); } @@ -1515,11 +1650,16 @@ * * When called, will call through to the Allocators to re-sync the * transient bits with the committed and live. + * + * The writeCache is passed into the allocator to enable any "now free" + * allocations to be cleared from the cache. Until the session is released + * the writeCache must be maintained to support readers of uncommitted and + * unwritten allocations. */ void releaseSessions() { if (m_minReleaseAge == 0) { for (FixedAllocator fa : m_allocs) { - fa.releaseSession(); + fa.releaseSession(m_writeCache); } } } @@ -1559,6 +1699,10 @@ // private long immediateFreeCount = 0; private void immediateFree(final int addr, final int sze) { + immediateFree(addr, sze, false); + } + + private void immediateFree(final int addr, final int sze, final boolean overrideSession) { switch (addr) { case 0: @@ -1575,14 +1719,18 @@ throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); } final long pa = alloc.getPhysicalAddress(addrOffset); + if (log.isTraceEnabled()) log.trace("Freeing allocation at " + addr + ", physical address: " + pa); - alloc.free(addr, sze); + alloc.free(addr, sze, overrideSession); // must clear after free in case is a blobHdr that requires reading! // the allocation lock protects against a concurrent re-allocation // of the address before the cache has been cleared assert pa != 0; - m_writeCache.clearWrite(pa); + // only clear any existing write to cache if no active session + if (overrideSession || !this.isSessionProtected()) { + m_writeCache.clearWrite(pa); + } m_frees++; if (alloc.isAllocated(addrOffset)) throw new IllegalStateException("Reallocation problem with WriteCache"); @@ -1649,7 +1797,12 @@ final ArrayList<FixedAllocator> list = m_freeFixed[i]; if (list.size() == 0) { - allocator = new FixedAllocator(this, block);//, m_writeCache); + if (canAllocateDirect()) { + allocator = new DirectFixedAllocator(this, block); + } else { + allocator = new FixedAllocator(this, block); + } + allocator.setFreeList(list); allocator.setIndex(m_allocs.size()); @@ -1707,6 +1860,13 @@ } } + /** + * @return true if we have spare directBuffers. + */ + private boolean canAllocateDirect() { + return m_directBuffers != null && m_directBuffers.size() < cMaxDirectBuffers; + } + private int fixedAllocatorIndex(final int size) { int i = 0; @@ -1788,13 +1948,23 @@ } final int newAddr = alloc(size + 4, context); // allow size for checksum + + if (newAddr == 0) + throw new IllegalStateException("NULL address allocated"); final int chk = ChecksumUtility.getCHK().checksum(buf, size); + + final long pa = physicalAddress(newAddr); - try { - m_writeCache.write(physicalAddress(newAddr), ByteBuffer.wrap(buf, 0, size), chk); - } catch (InterruptedException e) { - throw new RuntimeException("Closed Store?", e); + // if from DirectFixedAllocator then physical address will be negative + if (pa < 0) { + directWrite(pa, buf, 0, size, chk); + } else { + try { + m_writeCache.write(pa, ByteBuffer.wrap(buf, 0, size), chk); + } catch (InterruptedException e) { + throw new RuntimeException("Closed Store?", e); + } } // Update counters. @@ -1875,12 +2045,19 @@ // } // } - /** + /** * Toss away all buffered writes and then reload from the current root * block. + * + * If the store is using DirectFixedAllocators then an IllegalStateException + * is thrown */ public void reset() { assertOpen(); + + if (m_directBuffers != null) + throw new IllegalStateException("Reset is not supported with direct buffers"); + if (log.isInfoEnabled()) { log.info("RWStore Reset"); } @@ -1915,7 +2092,7 @@ // notify of current file length. m_writeCache.setExtent(convertAddr(m_fileSize)); } catch (Exception e) { - throw new IllegalStateException("Unable reset the store", e); + throw new IllegalStateException("Unable to reset the store", e); } finally { m_allocationLock.unlock(); } @@ -1971,10 +2148,14 @@ if (addr == 0) { throw new IllegalStateException("Invalid metabits address: " + m_metaBitsAddr); } - try { - m_writeCache.write(addr, ByteBuffer.wrap(buf), 0, false); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (addr < 0) { + directWrite(addr, buf, 0, buf.length, 0); + } else { + try { + m_writeCache.write(addr, ByteBuffer.wrap(buf), 0, false); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @@ -1993,8 +2174,11 @@ try { - checkDeferredFrees(true, journal); // free now if possible + final int totalFreed = checkDeferredFrees(true, journal); // free now if possible + if (totalFreed > 0 && log.isInfoEnabled()) { + log.info("Freed " + totalFreed + " deferralls on commit"); + } // free old storageStatsAddr if (m_storageStatsAddr != 0) { int len = (int) (m_storageStatsAddr & 0xFFFF); @@ -2017,14 +2201,14 @@ throw new IllegalStateException("Returned MetaBits Address not valid!"); } - // TODO: assert that m_deferredFreeOut is empty! - assert m_deferredFreeOut.getBytesWritten() == 0; - // Call immediateFree - no need to defer freeof metaBits, this // has to stop somewhere! // No more allocations must be made immediateFree((int) oldMetaBits, oldMetaBitsSize); + // There must be no buffered deferred frees + assert m_deferredFreeOut.getBytesWritten() == 0; + // save allocation headers final Iterator<Allocator> iter = m_commitList.iterator(); while (iter.hasNext()) { @@ -2097,8 +2281,10 @@ * <p> * Note: This method is package private in order to expose it to the unit * tests. + * + * returns number of addresses freed */ - /* public */void checkDeferredFrees(final boolean freeNow, + /* public */int checkDeferredFrees(final boolean freeNow, final Journal journal) { // Note: Invoked from unit test w/o the lock... @@ -2140,9 +2326,11 @@ * Note: This adds one to the lastDeferredReleaseTime to give * exclusive lower bound semantics. */ - freeDeferrals(journal, m_lastDeferredReleaseTime + 1, + return freeDeferrals(journal, m_lastDeferredReleaseTime + 1, latestReleasableTime); + } else { + return 0; } } @@ -2397,31 +2585,6 @@ return ret; } -// /* -// * clear -// * -// * reset the file size commit the root blocks -// */ -// public void clear() { -// try { -// baseInit(); -// -// m_fileSize = -4; -// m_metaStartAddr = m_fileSize; -// m_nextAllocation = -1; // keep on a 8K boundary (8K minimum -// // allocation) -// m_raf.setLength(convertAddr(m_fileSize)); -// -// m_curHdrAddr = 0; -// m_rootAddr = 0; -// -// startTransaction(); -// commitTransaction(); -// } catch (Exception e) { -// throw new StorageTerminalError("Unable to clear store", e); -// } -// } - public static long convertAddr(final int addr) { final long laddr = addr; if (laddr < 0) { @@ -2587,36 +2750,6 @@ return -1; } - -// // -------------------------------------------------------------------------------------- -// private String allocListStats(final List<Allocator> list, final AtomicLong counter) { -// final StringBuffer stats = new StringBuffer(); -// final Iterator<Allocator> iter = list.iterator(); -// while (iter.hasNext()) { -// stats.append(iter.next().getStats(counter)); -// } -// -// return stats.toString(); -// } -// -// public String getStats(final boolean full) { -// -// final AtomicLong counter = new AtomicLong(); -// -// final StringBuilder sb = new StringBuilder("FileSize : " + m_fileSize -// + " allocated : " + m_nextAllocation + "\r\n"); -// -// if (full) { -// -// sb.append(allocListStats(m_allocs, counter)); -// -// sb.append("Allocated : " + counter); -// -// } -// -// return sb.toString(); -// -// } public static class AllocationStats { public AllocationStats(final int i) { @@ -2626,11 +2759,29 @@ long m_reservedSlots; long m_filledSlots; } - /** - * Collected statistics are against each Allocation Block size. See - * {@link StorageStats#showStats(StringBuilder)} for details on the - * generated report. + * Utility debug outputing the allocator array, showing index, start + * address and alloc type/size + * + * Collected statistics are against each Allocation Block size: + * total number of slots | store size + * number of filled slots | store used + * <dl> + * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> + * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> + * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> + * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> + * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> + * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> + * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> + * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> + * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> + * </dl> */ public void showAllocators(final StringBuilder str) { m_storageStats.showStats(str); @@ -2761,8 +2912,8 @@ final FixedAllocator allocator = getBlock(addr); final int offset = getOffset(addr); final long laddr = allocator.getPhysicalAddress(offset); - - return laddr; + + return allocator instanceof DirectFixedAllocator ? -laddr : laddr; } } @@ -2790,10 +2941,6 @@ return alloc; } -// private int blockIndex(int addr) { -// return (-addr) >>> OFFSET_BITS; -// } - private FixedAllocator getBlock(final int addr) { final int index = (-addr) >>> OFFSET_BITS; @@ -2804,24 +2951,6 @@ return (-addr) & OFFSET_BITS_MASK; // OFFSET_BITS } -// public int addr2Size(final int addr) { -// if (addr > 0) { -// int size = 0; -// -// final int index = ((int) addr) % 16; -// -// if (index == 15) { // blob -// throw new Error("FIX ME : legacy BLOB code being accessed somehow"); -// } else { -// size = m_minFixedAlloc * m_allocSizes[index]; -// } -// -// return size; -// } else { -// return getBlock(addr).getPhysicalSize(getOffset(addr)); -// } -// } - /** * The {@link RWStore} always generates negative address values. * @@ -2831,150 +2960,10 @@ return addr <= 0; } -// /******************************************************************************* -// * called when used as a server, returns whether facility is enabled, this -// * is the whole point of the wormStore - so the answer is true -// **/ -// public boolean preserveSessionData() { -// m_preserveSession = true; -// -// return true; -// } -// -// /******************************************************************************* -// * called by allocation blocks to determine whether they can re-allocate -// * data within this session. -// **/ -// protected boolean isSessionPreserved() { -// return m_preserveSession || m_contexts.size() > 0; -// } - -// /********************************************************************* -// * create backup file, copy data to it, and close it. -// **/ -// synchronized public void backup(String filename) throws FileNotFoundException, IOException { -// File destFile = new File(filename); -// destFile.createNewFile(); -// -// RandomAccessFile dest = new RandomAccessFile(destFile, "rw"); -// -// int bufSize = 64 * 1024; -// byte[] buf = new byte[bufSize]; -// -// m_raf.seek(0); -// -// int rdSize = bufSize; -// while (rdSize == bufSize) { -// rdSize = m_raf.read(buf); -// if (rdSize > 0) { -// dest.write(buf, 0, rdSize); -// } -// } -// -// dest.close(); -// } -// -// /********************************************************************* -// * copy storefile to output stream. -// **/ -// synchronized public void backup(OutputStream outstr) throws IOException { -// int bufSize = 64 * 1024; -// byte[] buf = new byte[bufSize]; -// -// m_raf.seek(0); -// -// int rdSize = bufSize; -// while (rdSize == bufSize) { -// rdSize = m_raf.read(buf); -// if (rdSize > 0) { -// outstr.write(buf, 0, rdSize); -// } -// } -// } -// -// synchronized public void restore(InputStream instr) throws IOException { -// int bufSize = 64 * 1024; -// byte[] buf = new byte[bufSize]; -// -// m_raf.seek(0); -// -// int rdSize = bufSize; -// while (rdSize == bufSize) { -// rdSize = instr.read(buf); -// if (rdSize > 0) { -// m_raf.write(buf, 0, rdSize); -// } -// } -// } - -// /*************************************************************************************** -// * Needed by PSOutputStream for BLOB buffer chaining. -// **/ -// public void absoluteWriteInt(final int addr, final int offset, final int value) { -// try { -// // must check write cache!!, or the write may be overwritten - just -// // flush for now -// m_writes.flush(); -// -// m_raf.seek(physicalAddress(addr) + offset); -// m_raf.writeInt(value); -// } catch (IOException e) { -// throw new StorageTerminalError("Unable to write integer", e); -// } -// } - -// /*************************************************************************************** -// * Needed to free Blob chains. -// **/ -// public int absoluteReadInt(final int addr, final int offset) { -// try { -// m_raf.seek(physicalAddress(addr) + offset); -// return m_raf.readInt(); -// } catch (IOException e) { -// throw new StorageTerminalError("Unable to write integer", e); -// } -// } - -// /*************************************************************************************** -// * Needed by PSOutputStream for BLOB buffer chaining. -// **/ -// public int bufferChainOffset() { -// return m_maxFixedAlloc - 4; -// } - public File getStoreFile() { return m_fd; } -// public boolean isLongAddress() { -// // always ints -// return false; -// } - -// public int absoluteReadLong(long addr, int offset) { -// throw new UnsupportedOperationException(); -// } -// -// public void absoluteWriteLong(long addr, int threshold, long value) { -// throw new UnsupportedOperationException(); -// } - -// public void absoluteWriteAddress(long addr, int threshold, long addr2) { -// absoluteWriteInt((int) addr, threshold, (int) addr2); -// } - -// public int getAddressSize() { -// return 4; -// } - -// public RandomAccessFile getRandomAccessFile() { -// return m_raf; -// } - -// public FileChannel getChannel() { -// return m_raf.getChannel(); -// } - public boolean requiresCommit() { return m_recentAlloc; } @@ -3359,8 +3348,9 @@ /** * Provided with the address of a block of addresses to be freed * @param blockAddr + * @return the total number of addresses freed */ - private void freeDeferrals(final long blockAddr, final long lastReleaseTime) { + private int freeDeferrals(final long blockAddr, final long lastReleaseTime) { final int addr = (int) (blockAddr >> 32); final int sze = (int) blockAddr & 0xFFFFFF; @@ -3371,9 +3361,12 @@ getData(addr, buf); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_allocationLock.lock(); + int totalFreed = 0; try { int nxtAddr = strBuf.readInt(); + int cnt = 0; + while (nxtAddr != 0) { // while (false && addrs-- > 0) { if (nxtAddr > 0) { // Blob @@ -3386,6 +3379,8 @@ immediateFree(nxtAddr, 1); // size ignored for FixedAllocators } + totalFreed++; + nxtAddr = strBuf.readInt(); } m_lastDeferredReleaseTime = lastReleaseTime; @@ -3397,6 +3392,8 @@ } finally { m_allocationLock.unlock(); } + + return totalFreed; } /** @@ -3409,7 +3406,7 @@ * @param toTime * The exclusive upper bound. */ - private void freeDeferrals(final AbstractJournal journal, + private int freeDeferrals(final AbstractJournal journal, final long fromTime, final long toTime) { @@ -3438,6 +3435,8 @@ if(log.isTraceEnabled()) log.trace("fromTime=" + fromTime + ", toTime=" + toTime); + int totalFreed = 0; + while (commitRecords.hasNext()) { final ITuple<CommitRecordIndex.Entry> tuple = commitRecords.next(); @@ -3452,12 +3451,13 @@ if (blockAddr != 0) { - freeDeferrals(blockAddr, record.getTimestamp()); + totalFreed += freeDeferrals(blockAddr, record.getTimestamp()); } } + return totalFreed; } /** @@ -3465,6 +3465,7 @@ * and an overall list of allocators. When the context is detached, all * allocators must be released and any that has available capacity will be * assigned to the global free lists. + * See {@link AllocBlock #releaseSession} * * @param context * The context to be released from all FixedAllocators. @@ -3485,9 +3486,9 @@ /** * The ContextAllocation object manages a freeList of associated allocators - * and an overall list of allocators. When the context is detached, all - * allocators must be released and any that has available capacity will be - * assigned to the global free lists. + * and an overall list of allocators. When the context is aborted then + * allocations made by that context should be released. + * See {@link AllocBlock #abortShadow} * * @param context * The context to be released from all FixedAllocators. @@ -3499,7 +3500,7 @@ final ContextAllocation alloc = m_contexts.remove(context); if (alloc != null) { - alloc.release(); + alloc.abort(); } } finally { m_allocationLock.unlock(); @@ -4352,5 +4353,59 @@ m_allocationLock.unlock(); } } + + /** + * A request for a direct allocation from a Direct ByteBuffer + * + * @param blockSize the size requested + * @return the address of the direct allocation + */ + public int allocateDirect(final int blockSize) { + final int allocBytes = blockSize << this.ALLOCATION_SCALEUP; + if (m_directSpaceAvailable < allocBytes) { + // try and allocate a further buffer + addDirectBuffer(); + } + + if (m_directSpaceAvailable < allocBytes) { + return -1; + } else { + final int ret = m_nextDirectAllocation; + m_nextDirectAllocation += allocBytes; + m_directSpaceAvailable -= allocBytes; + + return ret; + } + } + + /** + * Returns the slot size associated with this address + */ + public int getAssociatedSlotSize(int addr) { + return getBlock(addr).getBlockSize(); + } + + /** + * lockAddress adds the address passed to a lock list. This is for + * debug only and is not intended to be used generally for the live system. + * + * @param addr - address to be locked + */ + public void lockAddress(int addr) { + m_allocationLock.lock(); + try { + if (m_lockAddresses == null) { + m_lockAddresses = new TreeMap<Integer, Integer>(); + } + + if (m_lockAddresses.containsKey(addr)) { + throw new IllegalStateException("address already locked " + addr); + } + + m_lockAddresses.put(addr, addr); + } finally { + m_allocationLock.unlock(); + } + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -207,7 +207,7 @@ return store.divide(size, 2, RoundingMode.HALF_UP).floatValue(); } public float totalWaste(long total) { - if (usedStore() == 0) + if (total == 0) return 0.0f; long slotWaste = reservedStore() - usedStore(); @@ -234,18 +234,30 @@ return allocs.divide(used, 2, RoundingMode.HALF_UP).floatValue(); } public float slotsUnused() { + if (m_totalSlots == 0) { + return 0.0f; + } + BigDecimal used = new BigDecimal(100 * (m_totalSlots-usedSlots())); BigDecimal total = new BigDecimal(m_totalSlots); if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentAllocations(long totalAllocations) { + if (totalAllocations == 0) { + return 0.0f; + } + BigDecimal used = new BigDecimal(100 * m_slotAllocations); BigDecimal total = new BigDecimal(totalAllocations); if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentSlotsInuse(long totalInuse) { + if (totalInuse == 0) { + return 0.0f; + } + BigDecimal used = new BigDecimal(100 * usedSlots()); BigDecimal total = new BigDecimal(totalInuse); if(total.signum()==0) return 0f; @@ -508,6 +520,9 @@ } private float dataPercent(long usedData, long totalData) { + if (totalData == 0) + return 0.0f; + BigDecimal used = new BigDecimal(100 * usedData); BigDecimal total = new BigDecimal(totalData); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -103,6 +103,11 @@ final Journal journal = new Journal(properties); + final IBufferStrategy bufferStrategy = journal.getBufferStrategy(); + if (bufferStrategy instanceof RWStrategy) { + ((RWStrategy)bufferStrategy).getRWStore().activateTx(); + } + try { // if(journal.getBufferStrategy() instanceof MappedBufferStrategy) { @@ -118,7 +123,7 @@ // } doConcurrentClientTest(journal,// - 10,// timeout + 30,// timeout 20,// nresources 1, // minLocks 3, // maxLocks @@ -129,6 +134,9 @@ ); } finally { + if (bufferStrategy instanceof RWStrategy) { + ... [truncated message content] |