|
From: <mar...@us...> - 2014-04-09 07:44:35
|
Revision: 8090
http://sourceforge.net/p/bigdata/code/8090
Author: martyncutcher
Date: 2014-04-09 07:44:27 +0000 (Wed, 09 Apr 2014)
Log Message:
-----------
For ticket #721: fix to BufferedWrite to ensure buffers are zero padded to the slot size when eliding contiguous writes. This caused a potential problem
with binary equivalence for snapshots.
Modified Paths:
--------------
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java
branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java
branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2014-04-08 22:24:37 UTC (rev 8089)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2014-04-09 07:44:27 UTC (rev 8090)
@@ -137,6 +137,9 @@
}
+ // Used to zero pad slots in buffered writes
+ final byte[] s_zeros = new byte[256];
+
/**
* Buffer a write.
*
@@ -188,6 +191,19 @@
}
// copy the caller's record into the buffer.
m_data.put(data);
+
+ // if data_len < slot_len then clear remainder of buffer
+ int padding = slot_len - data_len;
+ while (padding > 0) {
+ if (padding > s_zeros.length) {
+ m_data.put(s_zeros);
+ padding -= s_zeros.length;
+ } else {
+ m_data.put(s_zeros, 0, padding);
+ break;
+ }
+ }
+
// update the file offset by the size of the allocation slot
m_endAddr += slot_len;
// update the buffer position by the size of the allocation slot.
@@ -250,8 +266,9 @@
final ByteBuffer m_data = tmp.buffer();
// reset the buffer state.
- m_data.position(0);
- m_data.limit(m_data.capacity());
+ //m_data.position(0);
+ //m_data.limit(m_data.capacity());
+ m_data.clear();
m_startAddr = -1;
m_endAddr = 0;
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2014-04-08 22:24:37 UTC (rev 8089)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2014-04-09 07:44:27 UTC (rev 8090)
@@ -839,6 +839,7 @@
m_statsBucket.allocate(size);
}
+
return value;
} else {
StringBuilder sb = new StringBuilder();
@@ -1300,4 +1301,33 @@
return count;
}
+ /**
+ * Determines if the provided physical address is within an allocated slot
+ * @param addr
+ * @return
+ */
+ public boolean verifyAllocatedAddress(long addr) {
+ if (log.isTraceEnabled())
+ log.trace("Checking Allocator " + m_index + ", size: " + m_size);
+
+ final Iterator<AllocBlock> blocks = m_allocBlocks.iterator();
+ final long range = m_size * m_bitSize * 32;
+ while (blocks.hasNext()) {
+ final int startAddr = blocks.next().m_addr;
+ if (startAddr != 0) {
+ final long start = RWStore.convertAddr(startAddr);
+ final long end = start + range;
+
+ if (log.isTraceEnabled())
+ log.trace("Checking " + addr + " between " + start + " - " + end);
+
+ if (addr >= start && addr < end)
+ return true;
+ } else {
+ break;
+ }
+ }
+ return false;
+ }
+
}
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-08 22:24:37 UTC (rev 8089)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-09 07:44:27 UTC (rev 8090)
@@ -6963,7 +6963,7 @@
if (log.isDebugEnabled())
log.debug("writeRaw: " + offset);
-
+
// Guard IO against concurrent file extension.
final Lock lock = m_extensionLock.readLock();
@@ -7068,6 +7068,22 @@
}
}
+ /**
+ * Can be used to determine if an address is within an allocated slot.
+ *
+ * @param addr
+ * @return whether addr is within slot allocated area
+ */
+ public boolean verifyAllocatedAddress(final long addr) {
+ for (int index = 0; index < m_allocs.size(); index++) {
+ final FixedAllocator xfa = m_allocs.get(index);
+ if (xfa.verifyAllocatedAddress(addr))
+ return true;
+ }
+
+ return false;
+ }
+
public StoreState getStoreState() {
final RWStoreState ret = new RWStoreState(this);
Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java
===================================================================
--- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-08 22:24:37 UTC (rev 8089)
+++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-09 07:44:27 UTC (rev 8090)
@@ -6,6 +6,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import net.jini.config.Configuration;
+
import com.bigdata.ha.HAGlue;
import com.bigdata.ha.HAStatusEnum;
import com.bigdata.ha.msg.HARootBlockRequest;
@@ -13,11 +15,8 @@
import com.bigdata.ha.msg.IHASnapshotResponse;
import com.bigdata.journal.IRootBlockView;
import com.bigdata.journal.Journal;
-import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.LargeLoadTask;
import com.bigdata.rdf.sail.webapp.client.RemoteRepository;
-import net.jini.config.Configuration;
-
public class TestHA1SnapshotPolicy extends AbstractHA3BackupTestCase {
public TestHA1SnapshotPolicy() {
@@ -438,8 +437,8 @@
*/
public void testA_snapshot_multipleTx_restore_validate() throws Exception {
- final int N1 = 7; // #of transactions to run before the snapshot.
- final int N2 = 8; // #of transactions to run after the snapshot.
+ final int N1 = 7; //7; // #of transactions to run before the snapshot.
+ final int N2 = 8; //8; // #of transactions to run after the snapshot.
// Start service.
final HAGlue serverA = startA();
@@ -459,13 +458,13 @@
// Now run N transactions.
for (int i = 0; i < N1; i++) {
+
+ simpleTransaction();
- simpleTransaction();
-
}
+
+ final long commitCounterN1 = N1 + 1;
- final long commitCounterN1 = N1 + 1;
-
awaitCommitCounter(commitCounterN1, serverA);
/*
@@ -478,7 +477,7 @@
// Snapshot directory is empty.
assertEquals(1, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER));
-
+
// request snapshot on A.
final Future<IHASnapshotResponse> ft = serverA
.takeSnapshot(new HASnapshotRequest(0/* percentLogSize */));
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|