From: <mar...@us...> - 2010-11-12 15:48:17
|
Revision: 3940 http://bigdata.svn.sourceforge.net/bigdata/?rev=3940&view=rev Author: martyncutcher Date: 2010-11-12 15:48:11 +0000 (Fri, 12 Nov 2010) Log Message: ----------- BlobAllocatorless blob implementation Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-12 00:58:16 UTC (rev 3939) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-12 15:48:11 UTC (rev 3940) @@ -24,10 +24,13 @@ package com.bigdata.rwstore; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import org.apache.log4j.Logger; @@ -140,7 +143,7 @@ * PSOutputStream impl. */ - private int[] m_blobHeader = null; + private ArrayList<Integer> m_blobHeader = null; private byte[] m_buf = null; private boolean m_isSaved = false; // private long m_headAddr = 0; @@ -176,11 +179,8 @@ m_blobThreshold = maxAlloc-4; // allow for checksum - final int maxHdrSize = RWStore.BLOB_FIXED_ALLOCS * 4; - final int bufSize = m_blobThreshold > maxHdrSize ? m_blobThreshold : maxHdrSize; - - if (m_buf == null || m_buf.length != bufSize) - m_buf = new byte[bufSize]; + if (m_buf == null || m_buf.length != m_blobThreshold) + m_buf = new byte[m_blobThreshold]; reset(); } @@ -218,12 +218,16 @@ if (m_count == m_blobThreshold && !m_writingHdr) { if (m_blobHeader == null) { - m_blobHeader = new int[RWStore.BLOB_FIXED_ALLOCS]; // max 16K - m_blobHdrIdx = 0; + int hdrSize = m_blobThreshold/4; + if (hdrSize > RWStore.BLOB_FIXED_ALLOCS) + hdrSize = RWStore.BLOB_FIXED_ALLOCS; + m_blobHeader = new ArrayList<Integer>(); // only support header + // m_blobHdrIdx = 0; } final int curAddr = (int) m_store.alloc(m_buf, m_count, m_context); - m_blobHeader[m_blobHdrIdx++] = curAddr; + // m_blobHeader[m_blobHdrIdx++] = curAddr; + m_blobHeader.add(curAddr); m_count = 0; } @@ -324,28 +328,40 @@ if (m_blobHeader != null) { try { m_writingHdr = true; // ensure that header CAN be a BLOB - m_blobHeader[m_blobHdrIdx++] = addr; + // m_blobHeader[m_blobHdrIdx++] = addr; + m_blobHeader.add(addr); final int precount = m_count; m_count = 0; try { - writeInt(m_blobHdrIdx); - for (int i = 0; i < m_blobHdrIdx; i++) { - writeInt(m_blobHeader[i]); +// writeInt(m_blobHdrIdx); +// for (int i = 0; i < m_blobHdrIdx; i++) { +// writeInt(m_blobHeader[i]); +// } + int hdrBufSize = 4*(m_blobHeader.size() + 1); + ByteArrayOutputStream hdrbuf = new ByteArrayOutputStream(hdrBufSize); + DataOutputStream hdrout = new DataOutputStream(hdrbuf); + hdrout.writeInt(m_blobHeader.size()); + for (int i = 0; i < m_blobHeader.size(); i++) { + hdrout.writeInt(m_blobHeader.get(i)); } - addr = (int) m_store.alloc(m_buf, m_count, m_context); + hdrout.flush(); + + byte[] outbuf = hdrbuf.toByteArray(); + addr = (int) m_store.alloc(outbuf, hdrBufSize, m_context); - if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count) / m_blobThreshold)) { - throw new IllegalStateException( - "PSOutputStream.save at : " + addr - + ", bytes: " + m_bytesWritten - + ", blocks: " + m_blobHdrIdx - + ", last alloc: " + precount); - } +// if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count) / m_blobThreshold)) { +// throw new IllegalStateException( +// "PSOutputStream.save at : " + addr +// + ", bytes: " + m_bytesWritten +// + ", blocks: " + m_blobHdrIdx +// + ", last alloc: " + precount); +// } if (log.isDebugEnabled()) log.debug("Writing BlobHdrIdx with " + m_blobHdrIdx + " allocations"); - addr = m_store.registerBlob(addr); // returns handle + // DO NOT USE BLOB ALLOCATOR + // addr = m_store.registerBlob(addr); // returns handle } catch (IOException e) { e.printStackTrace(); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-12 00:58:16 UTC (rev 3939) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-12 15:48:11 UTC (rev 3940) @@ -281,6 +281,8 @@ static final int ALLOCATION_SCALEUP = 16; // multiplier to convert allocations based on minimum allocation of 32k static private final int META_ALLOCATION = 8; // 8 * 32K is size of meta Allocation + // Maximum fixed allocs in a BLOB, but do restrict to size that will fit within a single fixed allocation + // Ignored static final int BLOB_FIXED_ALLOCS = 2048; // private ICommitCallback m_commitCallback; // @@ -578,11 +580,14 @@ } final int maxBlockLessChk = m_maxFixedAlloc-4; - // set this at blob header references max 4096 fixed allocs - // meaning that header may itself be a blob if max fixed is - // less than 16K - m_maxBlobAllocSize = (BLOB_FIXED_ALLOCS * maxBlockLessChk); + // ensure that BLOB header cannot itself be a BLOB +// int blobFixedAlocs = maxBlockLessChk/4; +// if (blobFixedAlocs > RWStore.BLOB_FIXED_ALLOCS) +// blobFixedAlocs = RWStore.BLOB_FIXED_ALLOCS; +// m_maxBlobAllocSize = ((maxBlockLessChk/4) * maxBlockLessChk); + m_maxBlobAllocSize = Integer.MAX_VALUE; + assert m_maxFixedAlloc > 0; m_deferredFreeOut = PSOutputStream.getNew(this, m_maxFixedAlloc, null); @@ -754,8 +759,9 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); + // Can handle minor store version incompatibility final int storeVersion = strBuf.readInt(); - if (storeVersion != cVersion) { + if ((storeVersion & 0xFF00) != (cVersion & 0xFF00)) { throw new IllegalStateException("Incompatible RWStore header version"); } m_lastDeferredReleaseTime = strBuf.readLong(); @@ -1105,13 +1111,14 @@ } } - final Allocator na = getBlock((int) addr); - if (! (na instanceof BlobAllocator)) { - throw new IllegalStateException("Invalid Allocator index"); - } - final BlobAllocator ba = (BlobAllocator) na; - final int hdraddr = ba.getBlobHdrAddress(getOffset((int) addr)); - getData(hdraddr, hdrbuf); // read in header - could itself be a blob! +// final Allocator na = getBlock((int) addr); +// if (! (na instanceof BlobAllocator)) { +// throw new IllegalStateException("Invalid Allocator index"); +// } +// final BlobAllocator ba = (BlobAllocator) na; +// final int hdraddr = ba.getBlobHdrAddress(getOffset((int) addr)); +// getData(hdraddr, hdrbuf); // read in header - could itself be a blob! + getData(addr, hdrbuf); // fine but MUST NOT allow header to be a BLOB! final DataInputStream hdrstr = new DataInputStream(new ByteArrayInputStream(hdrbuf)); final int rhdrs = hdrstr.readInt(); if (rhdrs != nblocks) { @@ -1369,23 +1376,27 @@ } m_allocationLock.lock(); try { - final Allocator alloc = getBlockByAddress(addr); - /* - * There are a few conditions here. If the context owns the - * allocator and the allocation was made by this context then - * it can be freed immediately. - * The problem comes when the context is null and the allocator - * is NOT owned, BUT there are active AllocationContexts, in this - * situation, the free must ALWAYS be deferred. - */ - final boolean alwaysDefer = context == null && m_contexts.size() > 0; - if (alwaysDefer) - if (log.isDebugEnabled()) - log.debug("Should defer " + physicalAddress(addr)); - if (/*alwaysDefer ||*/ !alloc.canImmediatelyFree(addr, sze, context)) { - deferFree(addr, sze); + if (sze > m_maxFixedAlloc) { + freeBlob(addr, sze, context); } else { - immediateFree(addr, sze); + final Allocator alloc = getBlockByAddress(addr); + /* + * There are a few conditions here. If the context owns the + * allocator and the allocation was made by this context then + * it can be freed immediately. + * The problem comes when the context is null and the allocator + * is NOT owned, BUT there are active AllocationContexts, in this + * situation, the free must ALWAYS be deferred. + */ + final boolean alwaysDefer = context == null && m_contexts.size() > 0; + if (alwaysDefer) + if (log.isDebugEnabled()) + log.debug("Should defer " + physicalAddress(addr)); + if (/*alwaysDefer ||*/ !alloc.canImmediatelyFree(addr, sze, context)) { + deferFree(addr, sze); + } else { + immediateFree(addr, sze); + } } } finally { m_allocationLock.unlock(); @@ -1393,7 +1404,34 @@ } -// private long immediateFreeCount = 0; + private boolean freeBlob(final int hdr_addr, final int sze, final IAllocationContext context) { + if (sze < (m_maxFixedAlloc-4)) + throw new IllegalArgumentException("Unexpected address size"); + + final int alloc = m_maxFixedAlloc-4; + final int blcks = (alloc - 1 + sze)/alloc; + + // read in header block, then free each reference + final byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum + getData(hdr_addr, hdr); + + final DataInputStream instr = new DataInputStream( + new ByteArrayInputStream(hdr, 0, hdr.length-4) ); + try { + final int allocs = instr.readInt(); + for (int i = 0; i < allocs; i++) { + final int nxt = instr.readInt(); + free(nxt, m_maxFixedAlloc); + } + free(hdr_addr, hdr.length); + + return true; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + // private long immediateFreeCount = 0; private void immediateFree(final int addr, final int sze) { switch (addr) { @@ -1573,9 +1611,9 @@ if (size > (m_maxFixedAlloc - 4)) { - if (size > (BLOB_FIXED_ALLOCS * (m_maxFixedAlloc - 4))) + if (size > getMaxBlobSize()) throw new IllegalArgumentException( - "Allocation request beyond maximum BLOB"); + "Allocation request beyond maximum BLOB of " + getMaxBlobSize()); if (log.isTraceEnabled()) log.trace("BLOB ALLOC: " + size); @@ -1585,8 +1623,8 @@ try { int i = 0; - final int lsize = size - 512; - while (i < lsize) { + final int blocks = size/512; + for (int b = 0; b < blocks; b++) { psout.write(buf, i, 512); // add 512 bytes at a time i += 512; } @@ -1984,8 +2022,14 @@ * Use BCD-style numbering so * 0x0200 == 2.00 * 0x0320 == 3.20 + * + * The minor byte values should maintain binary compatibility, with + * major bytes + * Versions + * 0x0300 - extended header to include reserved ints + * 0x0400 - removed explicit BlobAllocators */ - final private int cVersion = 0x0300; + final private int cVersion = 0x0400; /** * MetaBits Header @@ -2419,12 +2463,10 @@ * number of filled slots | store used */ public void showAllocators(final StringBuilder str) { - final AllocationStats[] stats = new AllocationStats[m_allocSizes.length+1]; - for (int i = 0; i < stats.length-1; i++) { + final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; + for (int i = 0; i < stats.length; i++) { stats[i] = new AllocationStats(m_allocSizes[i]*64); } - // for BLOBs - stats[stats.length-1] = new AllocationStats(0); final Iterator<Allocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { @@ -2456,7 +2498,7 @@ tfilled += filled; tfilledSlots += stats[i].m_filledSlots; } - for (int i = 0; i < stats.length-1; i++) { + for (int i = 0; i < stats.length; i++) { final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; str.append(padRight("" + stats[i].m_blockSize, 10)); @@ -2466,9 +2508,6 @@ str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); str.append("\n"); } - // lastly some BLOB stats - only interested in used/reserved slots - str.append(padRight("BLOB", 10)); - str.append(padLeft("" + stats[stats.length-1].m_filledSlots, 12) + padLeft("" + stats[stats.length-1].m_reservedSlots, 12)); str.append("\n"); str.append(padRight("Totals", 10)); @@ -3405,6 +3444,11 @@ int ret = m_minFixedAlloc; while (data_len > ret) { i++; + // If we write directly to the writeCache then the data_len + // may be larger than largest slot + if (i == m_allocSizes.length) + return data_len; + ret = 64 * m_allocSizes[i]; } @@ -4059,7 +4103,7 @@ } public int getMaxBlobSize() { - return this.m_maxBlobAllocSize-4; + return m_maxBlobAllocSize-4; // allow for checksum } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-12 00:58:16 UTC (rev 3939) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-12 15:48:11 UTC (rev 3940) @@ -126,6 +126,15 @@ properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); + // number of bits in FixedAllocators + properties.setProperty(com.bigdata.rwstore.RWStore.Options.DEFAULT_FREE_BITS_THRESHOLD, "1000"); + + // Size of META_BITS_BLOCKS + properties.setProperty(com.bigdata.rwstore.RWStore.Options.DEFAULT_META_BITS_SIZE, "9"); + + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32"); // 2K max + properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16"); // 1K + return properties; } @@ -238,6 +247,8 @@ } public Properties getProperties() { + + System.out.println("TestRWJournal:getProperties"); final Properties properties = super.getProperties(); @@ -254,10 +265,26 @@ properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); + // number of bits in FixedAllocators + properties.setProperty(RWStore.Options.FREE_BITS_THRESHOLD, "50"); + + // Size of META_BITS_BLOCKS + properties.setProperty(RWStore.Options.META_BITS_SIZE, "9"); + + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32,48,64,128"); // 8K - max blob = 2K * 8K = 16M + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32,48,64,128"); // 2K max + properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16"); // 2K max + return properties; } + protected IRawStore getStore() { + + return new Journal(getProperties()); + + } + // /** // * Test that allocate() pre-extends the store when a record is allocated // * which would overflow the current user extent. @@ -758,10 +785,10 @@ int endBlob = 1024 * 1256; int[] faddrs = allocBatchBuffer(rw, 100, startBlob, endBlob); - System.out.println("Final allocation: " + rw.physicalAddress(faddrs[99]) - + ", allocations: " + (rw.getTotalAllocations() - numAllocs) - + ", allocated bytes: " + (rw.getTotalAllocationsSize() - startAllocations)); - } finally { + final StringBuilder str = new StringBuilder(); + rw.showAllocators(str); + System.out.println(str); + } finally { store.destroy(); @@ -776,17 +803,17 @@ final Journal store = (Journal) getStore(); try { + final RWStrategy bs = (RWStrategy) store + .getBufferStrategy(); - byte[] buf = new byte[1024 * 2048]; // 2Mb buffer of random data + final RWStore rw = bs.getRWStore(); + + + byte[] buf = new byte[2 * 1024 * 1024]; // 5Mb buffer of random data r.nextBytes(buf); ByteBuffer bb = ByteBuffer.wrap(buf); - RWStrategy bs = (RWStrategy) store - .getBufferStrategy(); - - RWStore rw = bs.getRWStore(); - long faddr = bs.write(bb); // rw.alloc(buf, buf.length); log.info("Blob Allocation at " + rw.convertFromAddr(faddr)); @@ -842,6 +869,12 @@ assertEquals(bb, rdBuf); + // now delete the memory + bs.delete(faddr); // immediateFree! + + faddr = bs.write(bb); // rw.alloc(buf, buf.length); + bb.position(0); + System.out.println("Now commit to disk"); store.commit(); @@ -862,12 +895,12 @@ rw.checkDeferredFrees(true, store); try { - rdBuf = bs.read(faddr); // should fail with illegal state + rdBuf = bs.read(faddr); // should fail with illegal argument throw new RuntimeException("Fail"); } catch (Exception ise) { - assertTrue("Expected IllegalStateException reading from " + (faddr >> 32) + " instead got: " + ise, ise instanceof IllegalStateException); + assertTrue("Expected IllegalArgumentException reading from " + (faddr >> 32) + " instead got: " + ise, ise instanceof IllegalArgumentException); } - + } finally { store.destroy(); @@ -1038,9 +1071,9 @@ // allocBatch(store, 1, 32, 650, 100000000); allocBatch(store, 1, 32, 650, 50000); store.commit(); - System.out.println("Final allocations: " + rw.getTotalAllocations() - + ", allocated bytes: " + rw.getTotalAllocationsSize() + ", file length: " - + rw.getStoreFile().length()); + final StringBuilder str = new StringBuilder(); + rw.showAllocators(str); + System.out.println(str); store.close(); System.out.println("Re-open Journal"); store = (Journal) getStore(); @@ -1065,7 +1098,7 @@ long realAddr = 0; try { // allocBatch(store, 1, 32, 650, 100000000); - pureAllocBatch(store, 1, 32, 3075, 300000); // cover wider range of blocks + pureAllocBatch(store, 1, 32, rw.m_maxFixedAlloc-4, 300000); // cover wider range of blocks store.commit(); System.out.println("Final allocations: " + rw.getTotalAllocations() + ", allocated bytes: " + rw.getTotalAllocationsSize() + ", file length: " @@ -1106,7 +1139,7 @@ .getBufferStrategy(); RWStore rw = bs.getRWStore(); - int freeAddr[] = new int[2048]; + int freeAddr[] = new int[512]; int freeCurs = 0; for (int i = 0; i < grp; i++) { int alloc = min + r.nextInt(sze-min); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |