From: <mar...@us...> - 2010-11-08 11:09:15
|
Revision: 3909 http://bigdata.svn.sourceforge.net/bigdata/?rev=3909&view=rev Author: martyncutcher Date: 2010-11-08 11:09:09 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Handle BlobHeaders larger than maximum fixed allocation - where the header must itself be a BLOB. This is possible with lower allocation settings, eg with 1K max fixed allocators, a 500K BLOB would require a 2K header to hold the 500 fixed allocation references. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -414,7 +414,7 @@ if (m_freeBits++ == 0 && false) { m_freeWaiting = false; m_freeList.add(this); - } else if (m_freeWaiting && m_freeBits == 3000) { + } else if (m_freeWaiting && m_freeBits == m_store.cDefaultFreeBitsThreshold) { m_freeWaiting = false; m_freeList.add(this); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -145,6 +145,8 @@ private PSOutputStream m_next = null; private int m_blobHdrIdx; + + private boolean m_writingHdr = false; private PSOutputStream next() { return m_next; @@ -162,8 +164,12 @@ m_context = context; m_blobThreshold = maxAlloc-4; // allow for checksum - if (m_buf == null || m_buf.length != m_blobThreshold) - m_buf = new byte[m_blobThreshold]; + + final int maxHdrSize = RWStore.BLOB_FIXED_ALLOCS * 4; + final int bufSize = m_blobThreshold > maxHdrSize ? m_blobThreshold : maxHdrSize; + + if (m_buf == null || m_buf.length != bufSize) + m_buf = new byte[bufSize]; reset(); } @@ -199,9 +205,9 @@ throw new IllegalStateException("Writing to saved PSOutputStream"); } - if (m_count == m_blobThreshold) { + if (m_count == m_blobThreshold && !m_writingHdr) { if (m_blobHeader == null) { - m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; + m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; // max 16K m_blobHdrIdx = 0; } @@ -305,27 +311,32 @@ int addr = (int) m_store.alloc(m_buf, m_count, m_context); if (m_blobHeader != null) { - m_blobHeader[m_blobHdrIdx++] = addr; - int precount = m_count; - m_count = 0; - try { - writeInt(m_blobHdrIdx); - for (int i = 0; i < m_blobHdrIdx; i++) { - writeInt(m_blobHeader[i]); - } - addr = (int) m_store.alloc(m_buf, m_count, m_context); - - if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { - throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); - } - - if (log.isDebugEnabled()) - log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); - - addr = m_store.registerBlob(addr); // returns handle - } catch (IOException e) { - e.printStackTrace(); - } + try { + m_writingHdr = true; // ensure that header CAN be a BLOB + m_blobHeader[m_blobHdrIdx++] = addr; + int precount = m_count; + m_count = 0; + try { + writeInt(m_blobHdrIdx); + for (int i = 0; i < m_blobHdrIdx; i++) { + writeInt(m_blobHeader[i]); + } + addr = (int) m_store.alloc(m_buf, m_count, m_context); + + if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count)/m_blobThreshold)) { + throw new IllegalStateException("PSOutputStream.save at : " + addr + ", bytes: "+ m_bytesWritten + ", blocks: " + m_blobHdrIdx + ", last alloc: " + precount); + } + + if (log.isDebugEnabled()) + log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); + + addr = m_store.registerBlob(addr); // returns handle + } catch (IOException e) { + e.printStackTrace(); + } + } finally { + m_writingHdr = false; + } } m_isSaved = true; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-07 12:49:25 UTC (rev 3908) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-08 11:09:09 UTC (rev 3909) @@ -245,6 +245,18 @@ String DEFAULT_META_BITS_SIZE = "9"; + /** + * Defines the number of bits that must be free in a FixedAllocator for + * it to be added to the free list. This is used to ensure a level + * of locality when making large numbers of allocations within a single + * commit. + * <p> + * The value should be >= 1 and <= 5000 + */ + String FREE_BITS_THRESHOLD = RWStore.class.getName() + ".freeBitsThreshold"; + + String DEFAULT_FREE_BITS_THRESHOLD = "300"; + } /* @@ -341,6 +353,12 @@ final int m_minFixedAlloc; /** + * Currently we do not support a Blob header to be a Blob, so the + * maximum possible Blob is ((maxFixed-4) * maxFixed) - 4. + */ + final int m_maxBlobAllocSize; + + /** * This lock is used to exclude readers when the extent of the backing file * is about to be changed. * <p> @@ -477,7 +495,16 @@ m_metaBitsSize = cDefaultMetaBitsSize; - m_metaBits = new int[m_metaBitsSize]; + cDefaultFreeBitsThreshold = Integer.valueOf(fileMetadata.getProperty( + Options.FREE_BITS_THRESHOLD, + Options.DEFAULT_FREE_BITS_THRESHOLD)); + + if (cDefaultFreeBitsThreshold < 1 || cDefaultFreeBitsThreshold > 5000) { + throw new IllegalArgumentException(Options.FREE_BITS_THRESHOLD + + " : Must be between 1 and 5000"); + } + + m_metaBits = new int[m_metaBitsSize]; m_metaTransientBits = new int[m_metaBitsSize]; @@ -556,6 +583,12 @@ m_minFixedAlloc = m_allocSizes[0]*64; } + final int maxBlockLessChk = m_maxFixedAlloc-4; + // set this at blob header references max 4096 fixed allocs + // meaning that header may itself be a blob if max fixed is + // less than 16K + m_maxBlobAllocSize = (BLOB_FIXED_ALLOCS * maxBlockLessChk); + assert m_maxFixedAlloc > 0; m_deferredFreeOut = PSOutputStream.getNew(this, m_maxFixedAlloc, null); @@ -668,7 +701,7 @@ + metaBitsAddr + ", m_commitCounter: " + commitCounter); } - + /** * Should be called where previously initFileSpec was used. * @@ -728,13 +761,14 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_lastDeferredReleaseTime = strBuf.readLong(); + cDefaultMetaBitsSize = strBuf.readInt(); final int allocBlocks = strBuf.readInt(); m_allocSizes = new int[allocBlocks]; for (int i = 0; i < allocBlocks; i++) { m_allocSizes[i] = strBuf.readInt(); } - m_metaBitsSize = metaBitsStore - allocBlocks - 3; // allow for deferred free + m_metaBitsSize = metaBitsStore - allocBlocks - 4; // allow for deferred free m_metaBits = new int[m_metaBitsSize]; if (log.isInfoEnabled()) { log.info("Raw MetaBitsAddr: " + rawmbaddr); @@ -854,10 +888,10 @@ * Meta-Allocations stored as {int address; int[8] bits}, so each block * holds 8*32=256 allocation slots of 1K totaling 256K. */ - for (int b = 0; b < m_metaBits.length; b += 9) { + for (int b = 0; b < m_metaBits.length; b += cDefaultMetaBitsSize) { final long blockStart = convertAddr(m_metaBits[b]); final int startBit = (b * 32) + 32; - final int endBit = startBit + (8*32); + final int endBit = startBit + ((cDefaultMetaBitsSize-1)*32); for (int i = startBit; i < endBit; i++) { if (tstBit(m_metaBits, i)) { final long addr = blockStart + ((i-startBit) * ALLOC_BLOCK_SIZE); @@ -1061,8 +1095,19 @@ + m_maxFixedAlloc); final byte[] hdrbuf = new byte[4 * (nblocks + 1) + 4]; // plus 4 bytes for checksum - final BlobAllocator ba = (BlobAllocator) getBlock((int) addr); - getData(ba.getBlobHdrAddress(getOffset((int) addr)), hdrbuf); // read in header + if (hdrbuf.length > m_maxFixedAlloc) { + if (log.isInfoEnabled()) { + log.info("LARGE BLOB - header is BLOB"); + } + } + + final Allocator na = getBlock((int) addr); + if (! (na instanceof BlobAllocator)) { + throw new IllegalStateException("Invalid Allocator index"); + } + final BlobAllocator ba = (BlobAllocator) na; + final int hdraddr = ba.getBlobHdrAddress(getOffset((int) addr)); + getData(hdraddr, hdrbuf); // read in header - could itself be a blob! final DataInputStream hdrstr = new DataInputStream(new ByteArrayInputStream(hdrbuf)); final int rhdrs = hdrstr.readInt(); if (rhdrs != nblocks) { @@ -1703,13 +1748,16 @@ // used to free the deferedFree allocations. This is used to determine // which commitRecord to access to process the nextbatch of deferred // frees. - final int len = 4 * (2 + 1 + m_allocSizes.length + m_metaBits.length); + // the cDefaultMetaBitsSize is also written since this can now be + // parameterized. + final int len = 4 * (2 + 1 + 1 + m_allocSizes.length + m_metaBits.length); final byte buf[] = new byte[len]; final FixedOutputStream str = new FixedOutputStream(buf); try { str.writeLong(m_lastDeferredReleaseTime); - + str.writeInt(cDefaultMetaBitsSize); + str.writeInt(m_allocSizes.length); for (int i = 0; i < m_allocSizes.length; i++) { str.writeInt(m_allocSizes[i]); @@ -1914,11 +1962,18 @@ /** * @see Options#META_BITS_SIZE */ - private final int cDefaultMetaBitsSize; + private int cDefaultMetaBitsSize; /** * @see Options#META_BITS_SIZE */ volatile private int m_metaBitsSize; + /** + * Package private since is uded by FixedAllocators + * + * @see Options#META_BITS_SIZE + */ + final int cDefaultFreeBitsThreshold; + private int m_metaBits[]; private int m_metaTransientBits[]; // volatile private int m_metaStartAddr; @@ -2075,7 +2130,7 @@ // final int bitsPerBlock = 9 * 32; final int intIndex = bit / 32; // divide 32; - final int addrIndex = (intIndex/9)*9; + final int addrIndex = (intIndex/cDefaultMetaBitsSize)*cDefaultMetaBitsSize; final long addr = convertAddr(m_metaBits[addrIndex]); final int intOffset = bit - ((addrIndex+1) * 32); @@ -2620,8 +2675,8 @@ long ret = physicalAddress((int) m_metaBitsAddr); ret <<= 16; - // include space for allocSizes and deferred free info - final int metaBitsSize = 2 + m_metaBits.length + m_allocSizes.length + 1; + // include space for allocSizes and deferred free info AND cDefaultMetaBitsSize + final int metaBitsSize = 2 + 1 + m_metaBits.length + m_allocSizes.length + 1; ret += metaBitsSize; if (log.isTraceEnabled()) @@ -3225,8 +3280,8 @@ } -// log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" -// + context); + log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" + + context); } @@ -3893,4 +3948,8 @@ } + public int getMaxBlobSize() { + return this.m_maxBlobAllocSize-4; + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |