From: <mar...@us...> - 2010-11-09 11:48:28
|
Revision: 3920 http://bigdata.svn.sourceforge.net/bigdata/?rev=3920&view=rev Author: martyncutcher Date: 2010-11-09 11:48:21 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Add version info to MetaBits header, correctly recycle PSOutputStreams, and fix erroneous maxAlloc assertions Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -56,13 +56,13 @@ } public int alloc(RWStore store, int size, IAllocationContext context) { - assert size > m_store.m_maxFixedAlloc; + assert size > (m_store.m_maxFixedAlloc-4); return 0; } public boolean free(int addr, int sze) { - if (sze < m_store.m_maxFixedAlloc) + if (sze < (m_store.m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); int alloc = m_store.m_maxFixedAlloc-4; int blcks = (alloc - 1 + sze)/alloc; @@ -103,8 +103,8 @@ } public int getFirstFixedForBlob(int addr, int sze) { - if (sze < m_store.m_maxFixedAlloc) - throw new IllegalArgumentException("Unexpected address size"); + if (sze < (m_store.m_maxFixedAlloc-4)) + throw new IllegalArgumentException("Unexpected address size: " + sze); int alloc = m_store.m_maxFixedAlloc-4; int blcks = (alloc - 1 + sze)/alloc; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -79,27 +79,24 @@ private static PSOutputStream m_poolHead = null; private static PSOutputStream m_poolTail = null; - private static Integer m_lock = new Integer(42); private static int m_streamCount = 0; - public static PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { - synchronized (m_lock) { - PSOutputStream ret = m_poolHead; - if (ret != null) { - m_streamCount--; - - m_poolHead = ret.next(); - if (m_poolHead == null) { - m_poolTail = null; - } - } else { - ret = new PSOutputStream(); - } + public static synchronized PSOutputStream getNew(final IStore store, final int maxAlloc, final IAllocationContext context) { + PSOutputStream ret = m_poolHead; + if (ret != null) { + m_streamCount--; - ret.init(store, maxAlloc, context); - - return ret; + m_poolHead = ret.next(); + if (m_poolHead == null) { + m_poolTail = null; + } + } else { + ret = new PSOutputStream(); } + + ret.init(store, maxAlloc, context); + + return ret; } /******************************************************************* @@ -110,23 +107,21 @@ * maximum of 10 streams are maintained - adding up to 80K to the * garbage collect copy. **/ - static void returnStream(PSOutputStream stream) { - synchronized (m_lock) { - if (m_streamCount > 10) { - return; - } - - stream.m_count = 0; // avoid overflow - - if (m_poolTail != null) { - m_poolTail.setNext(stream); - } else { - m_poolHead = stream; - } - - m_poolTail = stream; - m_streamCount++; + static synchronized void returnStream(PSOutputStream stream) { + if (m_streamCount > 10) { + return; } + + stream.m_count = 0; // avoid overflow + + if (m_poolTail != null) { + m_poolTail.setNext(stream); + } else { + m_poolHead = stream; + } + + m_poolTail = stream; + m_streamCount++; } private int[] m_blobHeader = null; @@ -161,6 +156,7 @@ void init(IStore store, int maxAlloc, IAllocationContext context) { m_store = store; m_context = context; + m_next = null; m_blobThreshold = maxAlloc-4; // allow for checksum @@ -357,11 +353,6 @@ return m_bytesWritten; } - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - public OutputStream getFilterWrapper(final boolean saveBeforeClose) { return new FilterOutputStream(this) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 23:56:01 UTC (rev 3919) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-09 11:48:21 UTC (rev 3920) @@ -283,7 +283,7 @@ static final int ALLOCATION_SCALEUP = 16; // multiplier to convert allocations based on minimum allocation of 32k static private final int META_ALLOCATION = 8; // 8 * 32K is size of meta Allocation - static final int BLOB_FIXED_ALLOCS = 1024; + static final int BLOB_FIXED_ALLOCS = 2048; // private ICommitCallback m_commitCallback; // // public void setCommitCallback(final ICommitCallback callback) { @@ -760,6 +760,10 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + final int storeVersion = strBuf.readInt(); + if (storeVersion != cVersion) { + throw new IllegalStateException("Incompatible RWStore header version"); + } m_lastDeferredReleaseTime = strBuf.readLong(); cDefaultMetaBitsSize = strBuf.readInt(); @@ -768,7 +772,7 @@ for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 4; // allow for deferred free + m_metaBitsSize = metaBitsStore - allocBlocks - cMetaHdrFields; // allow for header fields m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -1594,6 +1598,14 @@ throw new RuntimeException("Closed Store?", e); + } finally { + try { + psout.close(); // return stream + } catch (IOException ioe) { + // should not happen, since this should only be + // recycling + log.warn("Unexpected error closing PSOutputStream", ioe); + } } } @@ -1750,11 +1762,12 @@ // frees. // the cDefaultMetaBitsSize is also written since this can now be // parameterized. - final int len = 4 * (2 + 1 + 1 + m_allocSizes.length + m_metaBits.length); + final int len = 4 * (cMetaHdrFields + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); try { + str.writeInt(cVersion); str.writeLong(m_lastDeferredReleaseTime); str.writeInt(cDefaultMetaBitsSize); @@ -1941,6 +1954,7 @@ ints += 9 * allocBlocks; ints += 2; // for deferredFreeListAddr and size + ints += 1; // for version return ints*4; // return as bytes } @@ -1960,6 +1974,17 @@ */ /** + * MetaBits HEADER version must be changed when the header or allocator + * serialization changes + * + * Use BCD-style numbering so + * 0x0200 == 2.00 + * 0x0320 == 3.20 + */ + final private int cVersion = 0x0200; + + final private int cMetaHdrFields = 5; // version, deferredFree(long), + /** * @see Options#META_BITS_SIZE */ private int cDefaultMetaBitsSize; @@ -2393,11 +2418,12 @@ str.append("RWStore Allocation Summary\n"); str.append("-------------------------\n"); str.append(padRight("Allocator", 10)); - str.append(padLeft("Slots used", 12)); - str.append(padLeft("available", 12)); - str.append(padLeft("Store used", 14)); - str.append(padLeft("available", 14)); + str.append(padLeft("SlotsUsed", 12)); + str.append(padLeft("reserved", 12)); + str.append(padLeft("StoreUsed", 14)); + str.append(padLeft("reserved", 14)); str.append(padLeft("Usage", 8)); + str.append(padLeft("Store", 8)); str.append("\n"); long treserved = 0; long treservedSlots = 0; @@ -2410,11 +2436,16 @@ final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; tfilled += filled; tfilledSlots += stats[i].m_filledSlots; + } + for (int i = 0; i < stats.length; i++) { + final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; + final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; str.append(padRight("" + stats[i].m_blockSize, 10)); str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); + str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); str.append("\n"); } str.append(padRight("Totals", 10)); @@ -2704,16 +2735,15 @@ } /** - * Note that the representation of the - * + * The * @return long representation of metaBitsAddr PLUS the size */ public long getMetaBitsAddr() { long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes and deferred free info AND cDefaultMetaBitsSize - final int metaBitsSize = 2 + 1 + m_metaBits.length + m_allocSizes.length + 1; + // include space for version, allocSizes and deferred free info AND cDefaultMetaBitsSize + final int metaBitsSize = cMetaHdrFields + m_metaBits.length + m_allocSizes.length; ret += metaBitsSize; if (log.isTraceEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |