|
From: <mar...@us...> - 2014-07-17 15:46:28
|
Revision: 8568
http://sourceforge.net/p/bigdata/code/8568
Author: martyncutcher
Date: 2014-07-17 15:46:25 +0000 (Thu, 17 Jul 2014)
Log Message:
-----------
Add allocation strategy to reduce IOPs by allocating small slots from sparsely allocated regions. This enables the BufferedWrite to join adjacent slot writes into a single IO. Ticket #986
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/IBufferedWriter.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheCounters.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2014-07-17 14:57:42 UTC (rev 8567)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2014-07-17 15:46:25 UTC (rev 8568)
@@ -31,10 +31,12 @@
import com.bigdata.counters.CAT;
import com.bigdata.counters.CounterSet;
+import com.bigdata.counters.Instrument;
import com.bigdata.io.DirectBufferPool;
import com.bigdata.io.FileChannelUtility;
import com.bigdata.io.IBufferAccess;
import com.bigdata.io.IReopenChannel;
+import com.bigdata.rwstore.RWStore;
/**
* The BufferedWrite merges/elides sorted scattered writes to minimize IO
@@ -88,12 +90,7 @@
*/
private long m_endAddr = 0;
- /*
- * Counters.
- */
- private final CAT m_dataBytes = new CAT();
- private final CAT m_dataWrites = new CAT();
- private final CAT m_fileWrites = new CAT();
+ private final RWStore.StoreCounters<?> m_storeCounters;
public BufferedWrite(final IBufferedWriter store) throws InterruptedException {
@@ -102,6 +99,8 @@
m_store = store;
+ m_storeCounters = m_store.getStoreCounters();
+
m_data.set( DirectBufferPool.INSTANCE.acquire() );
}
@@ -162,7 +161,7 @@
public int write(final long offset, final ByteBuffer data,
final IReopenChannel<FileChannel> opener) throws IOException {
- m_dataWrites.increment();
+ m_storeCounters.bufferDataWrites++;
final int data_len = data.remaining();
final int slot_len = m_store.getSlotSize(data_len);
@@ -239,12 +238,12 @@
}
// increment by the amount of data currently in the buffer.
- m_dataBytes.add( m_data.position() );
+ m_storeCounters.bufferDataBytes += m_data.position();
// write out the data in the buffer onto the backing channel.
m_data.flip();
final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr);
- m_fileWrites.add(nwrites);
+ m_storeCounters.bufferFileWrites += nwrites;
reset();
@@ -280,19 +279,19 @@
public String getStats(final StringBuffer buf, final boolean reset) {
- final String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes;
+ final String ret = "BufferedWrites, data: " + m_storeCounters.bufferDataWrites + ", file: " + m_storeCounters.bufferFileWrites + ", bytes: " + m_storeCounters.bufferDataBytes;
if (buf != null) {
buf.append(ret + "\n");
}
if (reset) {
- m_dataBytes.set(0L);
- m_fileWrites.set(0L);
- m_dataWrites.set(0L);
+ m_storeCounters.bufferFileWrites = 0;
+ m_storeCounters.bufferDataWrites = 0;
+ m_storeCounters.bufferDataBytes = 0;
}
return ret;
- }
+ }
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/IBufferedWriter.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/IBufferedWriter.java 2014-07-17 14:57:42 UTC (rev 8567)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/IBufferedWriter.java 2014-07-17 15:46:25 UTC (rev 8568)
@@ -1,7 +1,11 @@
package com.bigdata.io.writecache;
+import com.bigdata.rwstore.RWStore;
+
public interface IBufferedWriter {
int getSlotSize(int data_len);
+ RWStore.StoreCounters<?> getStoreCounters();
+
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheCounters.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheCounters.java 2014-07-17 14:57:42 UTC (rev 8567)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheCounters.java 2014-07-17 15:46:25 UTC (rev 8568)
@@ -182,7 +182,7 @@
setValue(elapsedWriteNanos / 1000000000.);
}
});
-
+
return root;
} // getCounters()
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2014-07-17 14:57:42 UTC (rev 8567)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2014-07-17 15:46:25 UTC (rev 8568)
@@ -49,7 +49,7 @@
public class FixedAllocator implements Allocator {
private static final Logger log = Logger.getLogger(FixedAllocator.class);
-
+
private final int cModAllocation = 1 << RWStore.ALLOCATION_SCALEUP;
private final int cMinAllocation = cModAllocation * 1; // must be multiple of cModAllocation
@@ -204,20 +204,26 @@
private ArrayList m_freeList;
public void setFreeList(final ArrayList list) {
- m_freeList = list;
+ if (m_freeList != list) {
+ m_freeList = list;
+ m_freeWaiting = true;
+ }
- if (!m_pendingContextCommit && hasFree()) {
- m_freeList.add(this);
- m_freeWaiting = false;
+ if (!m_pendingContextCommit && hasFree() && meetsSmallSlotThreshold()) {
+ addToFreeList();
}
+
}
/**
* To support postHACommit an allocator can be removed from the current freelist
*/
void removeFromFreeList() {
- if (m_freeList != null)
+ if (m_freeList != null) {
+ // log.warn("Removing allocator " + m_index + " from free list");
m_freeList.remove(this);
+ m_freeWaiting = true;
+ }
}
@@ -405,9 +411,13 @@
final int calcFree = calcFreeBits();
final int calcLiveFree = calcLiveFreeBits();
- return m_freeBits == calcFree
+ final boolean ret = m_freeBits == calcFree
&& (m_freeBits + m_freeTransients) == calcLiveFree;
+
+ if (!ret)
+ throw new AssertionError("m_free: " + m_freeBits + ", calcFree: " + calcFree);
+ return ret;
}
// read does not read in m_size since this is read to determine the class of
@@ -466,6 +476,13 @@
private int m_startAddr = 0;
private int m_endAddr = 0;
+
+ /**
+ * For "small slot" allocators the allocation search is
+ * always from bit areas with less than a maximum density to
+ * ensure that writes have better locality.
+ */
+ int m_allocIndex = -1;
/**
* The #of int32 values in a single {@link AllocBlock} region. The
@@ -533,6 +550,51 @@
}
/**
+ * find the allocationIndex of first "sparsely committed" AllocBlock.
+ *
+ * Checks the committed bits of all the AllocBlocks until one is found with
+ * > 50% free (or less than 50% allocated) of the committed bits.
+ * @param store
+ * @param i
+ */
+ void resetAllocIndex() {
+ resetAllocIndex(0);
+ }
+
+ void resetAllocIndex(final int start) {
+ m_allocIndex = start;
+
+ if (m_size <= 1024) {
+ for (int a = m_allocIndex/m_bitSize; a < m_allocBlocks.size(); a++) {
+ final AllocBlock ab = m_allocBlocks.get(a);
+
+ checkBlock(ab);
+
+ for (int i = (m_allocIndex%m_bitSize); i < m_bitSize; i++) {
+ // first check if transients are already full
+ if (ab.m_transients[i] != 0xFFFFFFFF) {
+ // then check maximum 50% commit allocated
+ if (Integer.bitCount(ab.m_commit[i]) < 16) {
+ final AllocBlock abr = m_allocBlocks.get(m_allocIndex/m_bitSize);
+ assert abr == ab;
+
+ return;
+ }
+ }
+ m_allocIndex++;
+ }
+ }
+
+ // must remove from free list if we cannot set the alloc Index for a small slot
+ if (start == 0) {
+ removeFromFreeList();
+ } else {
+ resetAllocIndex(0);
+ }
+ }
+ }
+
+ /**
* This determines the size of the reservation required in terms of
* the number of ints each holding bits for 32 slots.
*
@@ -693,6 +755,7 @@
try {
if (log.isDebugEnabled())
checkBits();
+
if (((AllocBlock) m_allocBlocks.get(block))
.freeBit(offset % nbits, m_sessionActive && !overideSession)) { // bit adjust
@@ -721,7 +784,7 @@
if (log.isDebugEnabled())
checkBits();
- return true;
+ return true;
} else if (addr >= m_startAddr && addr < m_endAddr) {
final Iterator<AllocBlock> iter = m_allocBlocks.iterator();
@@ -755,112 +818,227 @@
private void checkFreeList() {
if (m_freeWaiting && !m_pendingContextCommit) {
- if (m_freeBits >= m_store.cDefaultFreeBitsThreshold) {
- m_freeWaiting = false;
+ if (meetsSmallSlotThreshold()) {
- if (log.isDebugEnabled())
- log.debug("Returning Allocator to FreeList - " + m_size);
+ addToFreeList();
- m_freeList.add(this);
+ resetAllocIndex(0);
}
}
}
+
+ private void addToFreeList() {
+ assert m_freeWaiting;
+
+ m_freeWaiting = false;
+ m_freeList.add(this);
+ m_allocIndex = -1;
+
+ if (log.isDebugEnabled())
+ log.debug("Returning Allocator to FreeList - " + m_size);
+ }
+
+ private boolean meetsSmallSlotThreshold() {
+ // check threshold for all slots
+ if (m_freeBits < m_store.cDefaultFreeBitsThreshold) {
+ return false;
+ }
+
+ // then check for small slots
+ if (m_size <= m_store.cSmallSlot) { // it's a small slotSMALL_SLOT_TYPE
+ return m_freeBits > m_store.cSmallSlotThreshold;
+ } else {
+ return true;
+ }
+ }
/**
* The introduction of IAllocationContexts has added some complexity to
* the older concept of a free list. With AllocationContexts it is
* possibly for allocator to have free space available but this being
- * restricted to a specific AllocaitonContext. The RWStore alloc method
- * must therefore handle the
+ * restricted to a specific AllocationContext.
+ * <p>
+ * In addition to the standard free allocation search we want to add a
+ * "density" restriction for small slots to encourage the aggregation
+ * of writes (by increasing the likelihood of sibling slot allocation).
+ * <p>
+ * There is some "Do What I mean" complexity here, with difficulty in
+ * determining a good rule to identify an initial allocation point. There
+ * is a danger of significantly reducing the allocation efficiency of
+ * short transactions if we too naively check committed bit density. We
+ * should only do this when identifying the initial allocation, and when
+ * the allocIndex is incremented.
*/
- public int alloc(final RWStore store, final int size, final IAllocationContext context) {
+ public int alloc(final RWStore store, final int size,
+ final IAllocationContext context) {
try {
- if (size <= 0)
- throw new IllegalArgumentException(
- "Allocate requires positive size, got: " + size);
+ if (size <= 0)
+ throw new IllegalArgumentException(
+ "Allocate requires positive size, got: " + size);
- if (size > m_size)
- throw new IllegalArgumentException(
- "FixedAllocator with slots of " + m_size
- + " bytes requested allocation for "+ size + " bytes");
+ if (size > m_size)
+ throw new IllegalArgumentException(
+ "FixedAllocator with slots of " + m_size
+ + " bytes requested allocation for " + size
+ + " bytes");
- int addr = -1;
+ if (m_freeBits == 0) {
+ throw new IllegalStateException("Request to allocate from " + m_size + "byte slot FixedAllocator with zero bits free - should not be on the Free List");
+ }
+
+ int addr = -1;
+
+ // Special allocation for small slots
+ if (m_size <= m_store.cSmallSlot) {
+ return allocFromIndex(size);
+ }
- final Iterator<AllocBlock> iter = m_allocBlocks.iterator();
- int count = -1;
- while (addr == -1 && iter.hasNext()) {
- count++;
+ final Iterator<AllocBlock> iter = m_allocBlocks.iterator();
+ int count = -1;
+ while (addr == -1 && iter.hasNext()) {
+ count++;
- final AllocBlock block = iter.next();
- if (block.m_addr == 0) {
- int blockSize = 32 * m_bitSize;
+ final AllocBlock block = iter.next();
+ checkBlock(block);
+
+ addr = block.alloc(m_size);
+ }
+
+ if (addr != -1) {
+
+ addr += 3; // Tweak to ensure non-zero address for offset 0
+
+ if (--m_freeBits == 0) {
+ if (log.isTraceEnabled())
+ log.trace("Remove from free list");
+ removeFromFreeList();
+
+ // Should have been first on list, now check for first
+ if (m_freeList.size() > 0) {
+ if (log.isDebugEnabled()) {
+ final FixedAllocator nxt = (FixedAllocator) m_freeList
+ .get(0);
+ log.debug("Freelist head: " + nxt.getSummaryStats());
+ }
+ }
+ }
+
+ addr += (count * 32 * m_bitSize);
+
+ final int value = -((m_index << RWStore.OFFSET_BITS) + addr);
+
if (m_statsBucket != null) {
- m_statsBucket.addSlots(blockSize);
+ m_statsBucket.allocate(size);
}
- blockSize *= m_size;
- blockSize >>= RWStore.ALLOCATION_SCALEUP;
- block.m_addr = grabAllocation(store, blockSize);
- if (log.isDebugEnabled())
- log.debug("Allocation block at " + block.m_addr + " of " + (blockSize << 16) + " bytes");
+ return value;
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append("FixedAllocator returning null address, with freeBits: "
+ + m_freeBits + "\n");
- if (m_startAddr == 0) {
- m_startAddr = block.m_addr;
+ for (AllocBlock ab : m_allocBlocks) {
+ sb.append(ab.show() + "\n");
}
- m_endAddr = block.m_addr - blockSize;
+
+ log.error(sb);
+
+ return 0;
}
- addr = block.alloc(m_size);
+ } finally {
+ if (log.isDebugEnabled())
+ checkBits();
}
+ }
+
+ void checkBlock(final AllocBlock block) {
+ if (block.m_addr == 0) {
+ int blockSize = 32 * m_bitSize;
+ if (m_statsBucket != null) {
+ m_statsBucket.addSlots(blockSize);
+ }
+ blockSize *= m_size;
+ blockSize >>= RWStore.ALLOCATION_SCALEUP;
- if (addr != -1) {
+ block.m_addr = grabAllocation(m_store, blockSize);
+ if (log.isDebugEnabled())
+ log.debug("Allocation block at " + block.m_addr
+ + " of " + (blockSize << 16) + " bytes");
- addr += 3; // Tweak to ensure non-zero address for offset 0
-
- if (--m_freeBits == 0) {
- if (log.isTraceEnabled())
- log.trace("Remove from free list");
- m_freeList.remove(this);
- m_freeWaiting = true;
-
- // Should have been first on list, now check for first
- if (m_freeList.size() > 0) {
- if (log.isDebugEnabled()) {
- final FixedAllocator nxt = (FixedAllocator) m_freeList.get(0);
- log.debug("Freelist head: " + nxt.getSummaryStats());
- }
- }
+ if (m_startAddr == 0) {
+ m_startAddr = block.m_addr;
}
+ m_endAddr = block.m_addr - blockSize;
+ }
- addr += (count * 32 * m_bitSize);
-
- final int value = -((m_index << RWStore.OFFSET_BITS) + addr);
+ }
+
+ int allocFromIndex(final int size) {
+
+ if (m_allocIndex == -1) {
+ resetAllocIndex();
- if (m_statsBucket != null) {
- m_statsBucket.allocate(size);
+ if (m_allocIndex == -1) {
+ throw new AssertionError("Unable to set AllocIndex with m_freeBits: " + m_freeBits);
}
-
-
- return value;
- } else {
- StringBuilder sb = new StringBuilder();
- sb.append("FixedAllocator returning null address, with freeBits: " + m_freeBits + "\n");
+ }
- for (AllocBlock ab: m_allocBlocks) {
- sb.append(ab.show() + "\n");
- }
-
- log.error(sb);
+ if (log.isDebugEnabled())
+ checkBits();
- return 0;
+
+ if (m_freeBits != calcFreeBits()) {
+ final int calc = calcFreeBits();
+ throw new AssertionError("m_freeBits != calcFreeBits() : " + m_freeBits + "!=" + calc);
}
- } finally {
- if (log.isDebugEnabled())
- checkBits();
+
+ // there MUST be bits free in the m_allocIndex block
+ final AllocBlock ab = m_allocBlocks.get(m_allocIndex/m_bitSize);
+
+ if (ab.m_addr == 0) {
+ throw new AssertionError("No allocation for AllocBlock with m_allocIndex: " + m_allocIndex);
}
+
+ final int abblock = m_allocIndex % m_bitSize;
+
+ assert ab.m_transients[abblock] != 0xFFFFFFFF; // not all set
+
+ final int bit = RWStore.fndBit(ab.m_transients[abblock]);
+
+ assert bit >= 0;
+
+ m_freeBits--;
+
+ final int abit = (abblock*32) + bit;
+ RWStore.setBit(ab.m_live, abit);
+ RWStore.setBit(ab.m_transients, abit);
+
+ // Note +3 for address teak for special low order bits
+ final int addr = -((m_index << RWStore.OFFSET_BITS) + (m_allocIndex*32) + (bit + 3));
+
+ // Now check current index
+ if (ab.m_transients[abblock] == 0xFFFFFFFF) {
+ // find next allocIndex
+ resetAllocIndex(m_allocIndex+1);
+ }
+
+ if (m_freeBits != calcFreeBits()) {
+ throw new AssertionError("m_freeBits != calcFreeBits()");
+ }
+ // assert m_freeBits == calcFreeBits();
+
+ if (m_statsBucket != null) {
+ m_statsBucket.allocate(size);
+ }
+
+ return addr;
}
protected int grabAllocation(RWStore store, int blockSize) {
- return store.allocBlock(blockSize);
+
+ final int ret = store.allocBlock(blockSize);
+
+ return ret;
}
public boolean hasFree() {
@@ -1040,9 +1218,16 @@
}
m_freeTransients = transientbits();
+ m_freeBits = calcFreeBits();
+ // Ensure allocIndex is reset
+ m_allocIndex = -1;
+
assert calcSessionFrees();
+ if (log.isDebugEnabled())
+ checkBits();
+
return isolatedWrites;
}
@@ -1145,6 +1330,8 @@
}
ab.setBitExternal(bit);
+
+ m_freeBits--;
}
public int getSlotSize() {
@@ -1231,8 +1418,8 @@
if (m_pendingContextCommit) {
m_pendingContextCommit = false;
- if (hasFree()) {
- m_freeList.add(this);
+ if (m_freeWaiting && meetsSmallSlotThreshold()) {
+ addToFreeList();
}
}
@@ -1241,15 +1428,17 @@
// Handle re-addition to free list once transient frees are
// added back
- if (m_freeWaiting && m_freeBits >= m_store.cDefaultFreeBitsThreshold) {
- m_freeList.add(this);
- m_freeWaiting = false;
+ if (m_freeWaiting && meetsSmallSlotThreshold()) {
+ addToFreeList();
}
m_freeTransients = 0;
}
+ if (log.isDebugEnabled())
+ checkBits();
+
}
/*
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-07-17 14:57:42 UTC (rev 8567)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-07-17 15:46:25 UTC (rev 8568)
@@ -67,6 +67,7 @@
import com.bigdata.btree.ITupleIterator;
import com.bigdata.btree.IndexMetadata;
import com.bigdata.cache.ConcurrentWeakValueCache;
+import com.bigdata.counters.CAT;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
import com.bigdata.counters.striped.StripedCounters;
@@ -357,6 +358,24 @@
String DEFAULT_FREE_BITS_THRESHOLD = "300";
/**
+ * Defines the size of a slot that defines it as a small slot.
+ * <p>
+ * Any slot equal to or less than this is considered a small slot and
+ * its availability for allocation is restricted to ensure a high
+ * chance that contiguous allocations can be made.
+ * <p>
+ * This is arranged by only returning small slot allocators to the free list
+ * if they have greater than 50% available slots, and then only allocating
+ * slots from sparse regions with >= 50% free/committed bits.
+ * <p>
+ * Small slot processing can be disabled by setting the smallSlotType to zero.
+ */
+ String SMALL_SLOT_TYPE = RWStore.class.getName() + ".smallSlotType";
+
+ // String DEFAULT_SMALL_SLOT_TYPE = "1024"; // standard default
+ String DEFAULT_SMALL_SLOT_TYPE = "0"; // initial default to no special processing
+
+ /**
* When <code>true</code>, scattered writes which are strictly ascending
* will be coalesced within a buffer and written out as a single IO
* (default {@value #DEFAULT_DOUBLE_BUFFER_WRITES}). This improves write
@@ -820,7 +839,16 @@
throw new IllegalArgumentException(Options.FREE_BITS_THRESHOLD
+ " : Must be between 1 and 5000");
}
-
+
+ cSmallSlot = Integer.valueOf(fileMetadata.getProperty(
+ Options.SMALL_SLOT_TYPE,
+ Options.DEFAULT_SMALL_SLOT_TYPE));
+
+ if (cSmallSlot < 0 || cSmallSlot > 2048) {
+ throw new IllegalArgumentException(Options.SMALL_SLOT_TYPE
+ + " : Must be between 0 and 2048");
+ }
+
m_metaBits = new int[m_metaBitsSize];
m_metaTransientBits = new int[m_metaBitsSize];
@@ -2117,37 +2145,13 @@
// With a non-null WCS, the actual read should be via a callback to readRaw, it should not get here
// unless it is not possible to cache - but maybe even then the WCS should read into a temporary
// buffer
- final long beginDisk = System.nanoTime();
- // If checksum is required then the buffer should be sized to include checksum in final 4 bytes
+
+ // If checksum is required then the buffer should be sized to include checksum in final 4 bytes
final ByteBuffer bb = ByteBuffer.wrap(buf, offset, length);
// Use ReadRaw - should be the same read all
readRaw(paddr, bb);
- // enable for debug
- if (false) {//FIXME EXTENSION_LOCK REQUIRED FOR IO.
- final byte[] nbuf = new byte[buf.length];
- final ByteBuffer nbb = ByteBuffer.wrap(nbuf, offset, length);
- FileChannelUtility.readAll(m_reopener, nbb, paddr);
- if (!Arrays.equals(buf, nbuf))
- throw new AssertionError();
-
- m_diskReads++;
- // Update counters.
- final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get()
- .acquire();
- try {
- final int nbytes = length;
- c.nreads++;
- c.bytesRead += nbytes;
- c.bytesReadFromDisk += nbytes;
- c.elapsedReadNanos += (System.nanoTime() - begin);
- c.elapsedDiskReadNanos += (System.nanoTime() - beginDisk);
- } finally {
- c.release();
- }
- }
-
final int chk = ChecksumUtility.getCHK().checksum(buf, offset, length-4); // read checksum
final int tstchk = bb.getInt(offset + length-4);
if (chk != tstchk) {
@@ -2669,6 +2673,10 @@
}
final int addr = allocator.alloc(this, size, context);
+
+ if (addr == 0) {
+ throw new IllegalStateException("Free Allocator unable to allocate address: " + allocator.getSummaryStats());
+ }
if (allocator.isUnlocked() && !m_commitList.contains(allocator)) {
m_commitList.add(allocator);
@@ -3360,7 +3368,7 @@
+ m_metaBitsAddr + ", active contexts: "
+ m_contexts.size());
- if (log.isDebugEnabled() && m_quorum.isHighlyAvailable()) {
+ if (log.isDebugEnabled() && m_quorum != null && m_quorum.isHighlyAvailable()) {
log.debug(showAllocatorList());
@@ -3624,6 +3632,10 @@
*/
final int cDefaultFreeBitsThreshold;
+ final int cSmallSlotThreshold = 4096; // debug test
+
+ int cSmallSlot = 1024; // @see from Options#SMALL_SLOT_TYPE
+
/**
* Each "metaBit" is a file region
*/
@@ -3952,7 +3964,7 @@
private void extendFile() {
final int adjust = -1200 + (m_fileSize / 10);
-
+
extendFile(adjust);
}
@@ -4050,12 +4062,21 @@
static int fndBit(final int[] bits, final int offset, final int size) {
final int eob = size + offset;
- for (int i = offset; i < eob; i++) {
- if (bits[i] != 0xFFFFFFFF) {
- for (int k = 0; k < 32; k++) {
- if ((bits[i] & (1 << k)) == 0) {
- return (i * 32) + k;
- }
+ for (int i = offset; i < eob; i++) {
+ final int b = fndBit(bits[i]);
+ if (b != -1) {
+ return (i * 32) + b;
+ }
+ }
+
+ return -1;
+ }
+
+ static int fndBit(final int bits) {
+ if (bits != 0xFFFFFFFF) {
+ for (int k = 0; k < 32; k++) {
+ if ((bits & (1 << k)) == 0) {
+ return k;
}
}
}
@@ -5493,19 +5514,27 @@
* #of times one of the root blocks has been written.
*/
public volatile long nwriteRootBlock;
+
+ /**
+ * buffer counters
+ */
+ public volatile long bufferDataBytes;
+ public volatile long bufferDataWrites;
+ public volatile long bufferFileWrites;
/**
* {@inheritDoc}
*/
public StoreCounters() {
- super();
+ super();
}
- /**
+ /**
* {@inheritDoc}
*/
public StoreCounters(final int batchSize) {
super(batchSize);
+
}
/**
@@ -5602,7 +5631,6 @@
ntruncate = 0;
nreopen = 0;
nwriteRootBlock = 0;
-
}
@Override
@@ -5695,9 +5723,25 @@
}
});
+
} // IRawStore
- // disk statistics
+ // BufferedWriter
+ final CounterSet bc = root.makePath("buffer");
+
+ bc.addCounter("ndataWrites", new Instrument<Long>() {
+ public void sample() {
+ setValue(bufferDataWrites);
+ }
+ });
+
+ bc.addCounter("nfileWrites", new Instrument<Long>() {
+ public void sample() {
+ setValue(bufferFileWrites);
+ }
+ });
+
+ // disk statistics
{
final CounterSet disk = root.makePath("disk");
@@ -6180,19 +6224,32 @@
final int position = dst.position();
try {
+ final long beginDisk = System.nanoTime();
+
// the offset into the disk file.
// final long pos = FileMetadata.headerSize0 + offset;
final long pos = offset;
+ final int length = dst.limit();
// read on the disk.
final int ndiskRead = FileChannelUtility.readAll(m_reopener,
dst, pos);
+ m_diskReads += ndiskRead;
+
+ final long now = System.nanoTime();
+
// update performance counters.
final StoreCounters<?> c = (StoreCounters<?>) storeCounters
.get().acquire();
try {
c.ndiskRead += ndiskRead;
+ final int nbytes = length;
+ c.nreads++;
+ c.bytesRead += nbytes;
+ c.bytesReadFromDisk += nbytes;
+ c.elapsedReadNanos += now - beginDisk;
+ c.elapsedDiskReadNanos += now - beginDisk;
} finally {
c.release();
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2014-07-17 14:57:42 UTC (rev 8567)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2014-07-17 15:46:25 UTC (rev 8568)
@@ -639,6 +639,26 @@
}
+ protected IRawStore getSmallSlotStore() {
+
+ return getSmallSlotStore(0);
+
+ }
+
+ protected IRawStore getSmallSlotStore(final int slotSize) {
+
+ final Properties properties = new Properties(getProperties());
+
+ properties.setProperty(
+ AbstractTransactionService.Options.MIN_RELEASE_AGE, "0");
+
+ properties.setProperty(
+ RWStore.Options.SMALL_SLOT_TYPE, "" + slotSize);
+
+ return getStore(properties);
+
+ }
+
protected Journal getStore(final long retentionMillis) {
final Properties properties = new Properties(getProperties());
@@ -830,6 +850,114 @@
}
/**
+ * Ensures the allocation of unique addresses by mapping allocated
+ * address with uniqueness assertion against physical address.
+ */
+ public void test_addressingContiguous() {
+
+ final Journal store = (Journal) getStore();
+
+ try {
+
+ final RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy();
+
+ final RWStore rw = bufferStrategy.getStore();
+ final int cSlotSize = 128;
+ final int cAllocSize = 99;
+
+ long pap = rw.physicalAddress(rw.alloc(cAllocSize, null));
+ for (int i = 0; i < 500000; i++) {
+ final int a = rw.alloc(cAllocSize, null);
+ final long pa = rw.physicalAddress(a);
+
+ if (pa != (pap+cSlotSize)) {
+ // for debug
+ rw.physicalAddress(a);
+ fail("Non-Contiguous slots: " + i + ", " + pa + "!=" + (pap+cSlotSize));
+ }
+
+ pap = pa;
+
+ }
+
+ store.commit();
+
+ final StringBuilder sb = new StringBuilder();
+ rw.showAllocators(sb);
+
+ log.warn(sb.toString());
+
+ } finally {
+
+ store.destroy();
+
+ }
+
+ }
+
+ /**
+ * Tests the recycling of small slot alloctors and outputs statistics related
+ * to contiguous allocations indicative of reduced IOPS.
+ */
+ public void test_smallSlotRecycling() {
+
+ final Journal store = (Journal) getSmallSlotStore(1024);
+
+ try {
+
+ final RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy();
+
+ final RWStore rw = bufferStrategy.getStore();
+ final int cSlotSize = 128;
+ final int cAllocSize = 99;
+
+ int breaks = 0;
+ int contiguous = 0;
+
+ ArrayList<Integer> recycle = new ArrayList<Integer>();
+
+ long pap = rw.physicalAddress(rw.alloc(cAllocSize, null));
+ for (int i = 0; i < 500000; i++) {
+ final int a = rw.alloc(cSlotSize, null);
+ final long pa = rw.physicalAddress(a);
+
+ if (r.nextInt(7) < 5) { // more than 50% recycle
+ recycle.add(a);
+ }
+
+ if (pa == (pap+cSlotSize)) {
+ contiguous++;
+ } else {
+ breaks++;
+ }
+
+ pap = pa;
+
+ if (recycle.size() > 5000) {
+ log.warn("Transient Frees for immediate recyling");
+ for (int e : recycle) {
+ rw.free(e, cAllocSize);
+ }
+ recycle.clear();
+ }
+ }
+
+ store.commit();
+
+ final StringBuilder sb = new StringBuilder();
+ rw.showAllocators(sb);
+
+ log.warn("Contiguous: " + contiguous + ", breaks: " + breaks + "\n" + sb.toString());
+
+ } finally {
+
+ store.destroy();
+
+ }
+
+ }
+
+ /**
* Basic allocation test to ensure the FixedAllocators are operating
* efficiently.
*
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|