|
From: <mar...@us...> - 2014-07-03 10:21:32
|
Revision: 8521
http://sourceforge.net/p/bigdata/code/8521
Author: martyncutcher
Date: 2014-07-03 10:21:20 +0000 (Thu, 03 Jul 2014)
Log Message:
-----------
Added CommitState to ensure that the RWStore is able to reset/abort following an error following or during the RWStore.commit() call - ticket #973.
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-07-02 22:44:27 UTC (rev 8520)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-07-03 10:21:20 UTC (rev 8521)
@@ -343,7 +343,7 @@
String META_BITS_DEMI_SPACE = RWStore.class.getName() + ".metabitsDemispace";
String DEFAULT_META_BITS_DEMI_SPACE = "false";
-
+
/**
* Defines the number of bits that must be free in a FixedAllocator for
* it to be added to the free list. This is used to ensure a level
@@ -824,8 +824,7 @@
m_metaBits = new int[m_metaBitsSize];
m_metaTransientBits = new int[m_metaBitsSize];
-
-
+
m_quorum = quorum;
m_fd = fileMetadata.file;
@@ -1488,7 +1487,6 @@
"Incompatible RWStore header version: storeVersion="
+ storeVersion + ", cVersion=" + cVersion);
}
-
m_lastDeferredReleaseTime = strBuf.readLong();
if (strBuf.readInt() != cDefaultMetaBitsSize) {
throw new IllegalStateException("Store opened with unsupported metabits size");
@@ -2880,15 +2878,19 @@
// }
/**
- * The semantics of reset are to revert unisolated writes to committed state.
- *
+ * The semantics of reset are to revert unisolated writes to committed
+ * state.
+ * <p>
* Unisolated writes must also be removed from the write cache.
- *
+ * <p>
* The AllocBlocks of the FixedAllocators maintain the state to determine
* the correct reset behavior.
- *
+ * <p>
* If the store is using DirectFixedAllocators then an IllegalStateException
- * is thrown
+ * is thrown.
+ * <p>
+ * If there is an active {@link #m_commitStateRef}, then this indicates a
+ * failure after the {@link RWStore#commit()} had "succeeded".
*/
public void reset() {
@@ -2899,7 +2901,16 @@
try {
assertOpen();
// assertNoRebuild();
+
+ final CommitState commitState = m_commitStateRef
+ .getAndSet(null/* newValue */);
+
+ if (commitState != null) {
+ commitState.reset(); // restore state values on RWStore.
+
+ }
+
boolean isolatedWrites = false;
/**
* Clear all allocators, not just dirty allocators, since we also
@@ -3101,23 +3112,89 @@
return requiresCommit();
}
-// static final float s_version = 3.0f;
-//
-// public String getVersionString() {
-// return "RWStore " + s_version;
-// }
+ /**
+ * Object recording the undo state for the {@link RWStore#commit()} ...
+ * {@link RWStore#postCommit()} sequence. The {@link CommitState} must
+ * either {@link CommitState#commit()} or {@link CommitState#reset()}. Those
+ * {@link CommitState} methods are invoked out of the corresponding
+ * {@link RWStore} methods.
+ *
+ * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is not
+ * robust to internal failure.</a>
+ */
+ private class CommitState {
+ /*
+ * Critical pre-commit state that must be restored if a commit is
+ * discarded.
+ */
+ private final int m_lastCommittedNextAllocation;
+ private final long m_storageStatsAddr;
+ private final int m_metaBitsAddr;
+ CommitState() {
+ // retain copy of critical pre-commit state
+ if (!m_allocationWriteLock.isHeldByCurrentThread())
+ throw new IllegalMonitorStateException();
+ m_lastCommittedNextAllocation = RWStore.this.m_committedNextAllocation;
+ m_storageStatsAddr = RWStore.this.m_storageStatsAddr;
+ m_metaBitsAddr = RWStore.this.m_metaBitsAddr;
+ }
+
+ void postCommit() {
+
+ // NOP
+
+ }
+
+ /** Reset pre-commit state to support reset/abort/rollback. */
+ void reset() {
+ if (!m_allocationWriteLock.isHeldByCurrentThread())
+ throw new IllegalMonitorStateException();
+ RWStore.this.m_storageStatsAddr = m_storageStatsAddr;
+ RWStore.this.m_committedNextAllocation = m_lastCommittedNextAllocation;
+ RWStore.this.m_metaBitsAddr = m_metaBitsAddr;
+ }
+
+ }
+
+ /**
+ * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is not
+ * robust to internal failure.</a>
+ */
+ private final AtomicReference<CommitState> m_commitStateRef = new AtomicReference<CommitState>();
+
+ /**
+ * Package private method used by the test suite.
+ */
+ void clearCommitStateRef() {
+
+ m_commitStateRef.set(null/* newValue */);
+
+ }
+
+ @Override
public void commit() {
assertOpen();
// assertNoRebuild();
checkCoreAllocations();
- // take allocation lock to prevent other threads allocating during commit
+ // take allocation lock to prevent other threads allocating during commit
m_allocationWriteLock.lock();
try {
+ /*
+ * Create a transient object to retain values of previous
+ * commitState to support abort/reset/rollback if requested after
+ * this commit() is requested.
+ */
+ if (!m_commitStateRef.compareAndSet(null/* expect */,
+ new CommitState())) {
+ throw new IllegalStateException(
+ "RWStore commitState found, incomplete previous commit must be rolled back/aborted");
+ }
+
// final int totalFreed = checkDeferredFrees(true, journal); // free now if possible
//
// if (totalFreed > 0 && log.isInfoEnabled()) {
@@ -3150,20 +3227,20 @@
if ((!m_useMetabitsDemispace) && reqmbc < m_maxFixedAlloc) {
nmbaddr = alloc(reqmbc, null);
}
-
+
// If existing allocation, then free it
- if (m_metaBitsAddr < 0) {
-
+ if (m_metaBitsAddr < 0) {
+
final int oldMetaBitsSize = (m_metaBits.length
+ m_allocSizes.length + 1) * 4;
- // Call immediateFree - no need to defer freeof metaBits, this
- // has to stop somewhere!
- // No more allocations must be made
- immediateFree((int) m_metaBitsAddr, oldMetaBitsSize);
-
- }
-
+ // Call immediateFree - no need to defer freeof metaBits, this
+ // has to stop somewhere!
+ // No more allocations must be made
+ immediateFree((int) m_metaBitsAddr, oldMetaBitsSize);
+
+ }
+
m_metaBitsAddr = nmbaddr;
}
@@ -3188,8 +3265,8 @@
}
if (m_metaBitsAddr > 0) { // Demi-Space
- // Now "toggle" m_metaBitsAddr - 64K boundary
- m_metaBitsAddr ^= 0x01; // toggle zero or 64K offset
+ // Now "toggle" m_metaBitsAddr - 64K boundary
+ m_metaBitsAddr ^= 0x01; // toggle zero or 64K offset
}
if (log.isDebugEnabled()) {
@@ -3199,7 +3276,7 @@
} else {
mbaddr = convertAddr(-m_metaBitsAddr); // maximum 48 bit address range
}
-
+
log.debug("Writing metabits at " + mbaddr);
}
@@ -3254,9 +3331,6 @@
throw new RuntimeException(e);
}
- // Now remember the committed next allocation that will be checked in reset()
- m_committedNextAllocation = m_nextAllocation;
-
// Should not write rootBlock, this is responsibility of client
// to provide control
// writeFileSpec();
@@ -3291,12 +3365,13 @@
log.debug(showAllocatorList());
}
-
+
}
/**
* {@inheritDoc}
*/
+ @Override
public Lock getCommitLock() {
return m_allocationWriteLock;
@@ -3308,11 +3383,25 @@
* <p>
* Commits the FixedAllocator bits
*/
+ @Override
public void postCommit() {
if (!m_allocationWriteLock.isHeldByCurrentThread())
throw new IllegalMonitorStateException();
+ final CommitState commitState = m_commitStateRef.getAndSet(null/* newValue */);
+
+ if (commitState == null) {
+
+ throw new IllegalStateException(
+ "No current CommitState found on postCommit");
+
+ } else {
+
+ commitState.postCommit();
+
+ }
+
for (FixedAllocator fa : m_commitList) {
fa.postCommit();
@@ -3323,6 +3412,7 @@
}
+ @Override
public int checkDeferredFrees(final AbstractJournal journal) {
if (journal == null)
@@ -4150,7 +4240,7 @@
try {
if (addr >= 0) {
-
+
return addr & 0xFFFFFFE0;
} else {
@@ -4277,31 +4367,31 @@
*
* @return long representation of metaBitsAddr PLUS the size
*/
- public long getMetaBitsAddr() {
+ public long getMetaBitsAddr() {
long ret = 0;
-
+
if (m_metaBitsAddr < 0) {
ret = physicalAddress((int) m_metaBitsAddr);
} else {
- // long ret = physicalAddress((int) m_metaBitsAddr);
+ // long ret = physicalAddress((int) m_metaBitsAddr);
ret = convertAddr(-m_metaBitsAddr); // maximum 48 bit address range
}
- ret <<= 16;
-
+ ret <<= 16;
+
// include space for version, allocSizes and deferred free info AND
// cDefaultMetaBitsSize
final int metaBitsSize = cMetaHdrFields + m_metaBits.length
+ m_allocSizes.length;
- ret += metaBitsSize;
+ ret += metaBitsSize;
+
+ if (log.isTraceEnabled())
+ log.trace("Returning metabitsAddr: " + ret + ", for "
+ + m_metaBitsAddr + " - " + m_metaBits.length + ", "
+ + metaBitsSize);
- if (log.isTraceEnabled())
- log.trace("Returning metabitsAddr: " + ret + ", for "
- + m_metaBitsAddr + " - " + m_metaBits.length + ", "
- + metaBitsSize);
+ return ret;
+ }
- return ret;
- }
-
/**
*
* @return the address of the metaBits
@@ -5178,7 +5268,7 @@
checkRootBlock(rootBlock);
assertOpen();
-
+
if (log.isTraceEnabled()) {
log.trace("Writing new rootblock with commitCounter: "
+ rootBlock.getCommitCounter() + ", commitRecordAddr: "
@@ -7242,7 +7332,7 @@
return ret;
}
-
+
/**
* Forces a reset of the metabits allocation on the next commit.
* <p>
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2014-07-02 22:44:27 UTC (rev 8520)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2014-07-03 10:21:20 UTC (rev 8521)
@@ -27,24 +27,16 @@
package com.bigdata.rwstore;
-import java.io.EOFException;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
import junit.extensions.proxy.ProxyTestSuite;
import junit.framework.Test;
@@ -577,8 +569,6 @@
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
- * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher
- * $
*/
public static class TestRawStore extends AbstractRestartSafeTestCase {
@@ -1540,116 +1530,23 @@
public void test_metaAlloc() {
Journal store = (Journal) getStore();
- try {
+ try {
- final RWStrategy bs = (RWStrategy) store.getBufferStrategy();
+ final RWStrategy bs = (RWStrategy) store.getBufferStrategy();
- final RWStore rw = bs.getStore();
- long realAddr = 0;
- for (int r = 0; r < 100; r++) {
- for (int i = 0; i < 1000; i++) {
- int allocAddr = rw.metaAlloc();
-
- realAddr = rw.metaBit2Addr(allocAddr);
- }
- rw.commit();
+ final RWStore rw = bs.getStore();
+ long realAddr = 0;
+ for (int i = 0; i < 100000; i++) {
+ int allocAddr = rw.metaAlloc();
+
+ realAddr = rw.metaBit2Addr(allocAddr);
}
-
- if (log.isInfoEnabled())
- log.info("metaAlloc lastAddr: " + realAddr);
+ if(log.isInfoEnabled())log.info("metaAlloc lastAddr: " + realAddr);
} finally {
store.destroy();
}
}
-
- /**
- * Tests the MetabitsUtil to switch the demispace.
- *
- * If the address is a demispace then addr % 64K == 0.
- *
- * If the address is NOT a demispace then it should be less than first
- * demispace
- */
- public void test_metabitsDemispace() {
- Journal store = (Journal) getStore();
- try {
- RWStrategy bs = (RWStrategy) store.getBufferStrategy();
- RWStore rw = bs.getStore();
- final String fname = rw.getStoreFile().getAbsolutePath();
-
- store.commit();
-
- final long fa1 = rw.getMetaBitsStoreAddress();
-
- rw.ensureMetabitsDemispace(true);
- store.commit();
-
- final long ds1 = rw.getMetaBitsStoreAddress();
-
- assertTrue((ds1 & 0xFFFF) == 0); // MOD 64K
- assertTrue(ds1 > fa1);
-
- rw.ensureMetabitsDemispace(false);
- store.commit();
-
- final long fa2 = rw.getMetaBitsStoreAddress();
-
- assertTrue(ds1 > fa2);
-
- rw.ensureMetabitsDemispace(true);
- store.commit();
-
- final long ds2 = rw.getMetaBitsStoreAddress();
-
- assertTrue((ds2 & 0xFFFF) == 0);
- assertTrue(ds2 > ds1);
-
- // Now use MetaBitsUtil
-
- store.close();
-
- MetabitsUtil.main(new String[] { "-store", fname, "-usedemispace", "false"});
-
- store = getExplicitStore(fname);
-
- bs = (RWStrategy) store.getBufferStrategy();
- rw = bs.getStore();
- final long fa3 = rw.getMetaBitsStoreAddress();
-
- assertTrue(fa3 < ds1);
-
- store.close();
-
- MetabitsUtil.main(new String[] { "-store", fname, "-usedemispace", "true"});
-
- store = getExplicitStore(fname);
-
- bs = (RWStrategy) store.getBufferStrategy();
- rw = bs.getStore();
-
- final long ds3 = rw.getMetaBitsStoreAddress();
- assertTrue((ds3 & 0xFFFF) == 0);
- assertTrue(ds3 > ds2);
-
- } finally {
- store.destroy();
- }
- }
-
- Journal getExplicitStore(String storeFile) {
-
- final Properties properties = new Properties();
-
- properties.setProperty(Options.FILE, storeFile);
-
- properties.setProperty(Options.BUFFER_MODE,
- BufferMode.DiskRW.toString());
-
- return new Journal(properties);// .getBufferStrategy();
-
- }
-
static class DummyAllocationContext implements IAllocationContext {
}
@@ -2104,7 +2001,136 @@
store.destroy();
}
}
+
+ /**
+ * Verify that we correctly restore the RWStore commit state if
+ * {@link RWStore#commit()} is followed by {@link RWStore#reset()}
+ * rather than {@link RWStore#postCommit()}.
+ *
+ * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is
+ * not robust to internal failure.</a>
+ */
+ public void test_commitState() {
+ Journal store = (Journal) getStore();
+ try {
+
+ final RWStrategy bs = (RWStrategy) store.getBufferStrategy();
+
+ final RWStore rws = bs.getStore();
+
+ final long addr = bs.write(randomData(78));
+
+ // do 1st half of the RWStore commit protocol.
+ rws.commit();
+
+ // then discard write set.
+ store.abort();
+
+ assertFalse(bs.isCommitted(addr)); // rolled back
+
+ // now cycle standard commit to confirm correct reset
+ for (int c = 0; c < 50; c++) {
+ bs.write(randomData(78));
+ store.commit();
+ }
+
+
+ } finally {
+ store.destroy();
+ }
+ }
+ /**
+ * Test verifies that a failure to retain the commit state in
+ * {@link RWStore#commit()} will cause problems if the write set is
+ * discarded by {@link RWStore#reset()} such that subsequent write sets
+ * run into persistent addressing errors.
+ *
+ * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is
+ * not robust to internal failure.</a>
+ */
+ public void test_commitStateError() {
+ Journal store = (Journal) getStore();
+ try {
+
+ RWStrategy bs = (RWStrategy) store.getBufferStrategy();
+
+ RWStore rws = bs.getStore();
+
+ final long addr = bs.write(randomData(78));
+
+ // do first half of the RWStore protocol.
+ rws.commit();
+
+ /*
+ * remove the commit state such that subsequent abort()/reset()
+ * will fail to correctly restore the pre-commit state.
+ */
+ rws.clearCommitStateRef();
+
+ // abort() succeeds because it is allowed even if commit() was
+ // not called.
+ store.abort();
+
+ assertFalse(bs.isCommitted(addr)); // rolled back
+
+ try {
+ // now cycle standard commit to force an error from bad reset
+ for (int c = 0; c < 50; c++) {
+ bs.write(randomData(78));
+ store.commit();
+ }
+ fail("Expected failure");
+ } catch (Exception e) {
+ // expected
+ log.info("Expected!");
+ }
+
+ } finally {
+ store.destroy();
+ }
+ }
+
+ /**
+ * Verify that a double-commit causes an illegal state exception.
+ * Further verify that an {@link RWStore#reset()} allwos us to then
+ * apply and commit new write sets.
+ *
+ * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is
+ * not robust to internal failure.</a>
+ */
+ public void test_commitStateIllegal() {
+ final Journal store = (Journal) getStore();
+ try {
+
+ final RWStrategy bs = (RWStrategy) store.getBufferStrategy();
+
+ final RWStore rws = bs.getStore();
+
+ bs.write(randomData(78));
+
+ rws.commit();
+
+ try {
+ store.commit();
+
+ fail("Expected failure");
+ } catch (Exception ise) {
+ if (InnerCause.isInnerCause(ise, IllegalStateException.class)) {
+ store.abort();
+
+ store.commit();
+ } else {
+ fail("Unexpected Exception");
+ }
+ }
+
+
+ } finally {
+ store.destroy();
+ }
+ }
+
public void test_allocCommitFreeWithHistory() {
Journal store = (Journal) getStore(4);
try {
@@ -3052,8 +3078,6 @@
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
- * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher
- * $
*/
public static class TestMROW extends AbstractMROWTestCase {
@@ -3065,15 +3089,11 @@
super(name);
}
- protected IRawStore getStore(String storeFile) {
+ protected IRawStore getStore() {
final Properties properties = getProperties();
- if (storeFile == null) {
- properties.setProperty(Options.CREATE_TEMP_FILE, "true");
- } else {
- properties.setProperty(Options.FILE, storeFile);
- }
+ properties.setProperty(Options.CREATE_TEMP_FILE, "true");
properties.setProperty(Options.DELETE_ON_EXIT, "true");
@@ -3085,86 +3105,6 @@
}
- protected IRawStore getStore() {
-
- return getStore(null); // no file provided by default
-
- }
-
- static long getLongArg(final String[] args, final String arg, final long def) {
- final String sv = getArg(args, arg, null);
-
- return sv == null ? def : Long.parseLong(sv);
- }
-
- static String getArg(final String[] args, final String arg, final String def) {
- for (int p = 0; p < args.length; p+=2) {
- if (arg.equals(args[p]))
- return args[p+1];
- }
-
- return def;
- }
-
- /**
- * Stress variant to support multiple parameterised runs
- *
- * Arguments
- *
- * -file - optional explicit file path
- * -clients - reader threads
- * -nwrites - number of records written
- * -reclen - size of record written
- * -ntrials - number of readers
- * -nreads - number of reads made by each reader
- * -nruns - number of times to repeat process with reopen each time
- */
- public static void main(final String[] args) throws Exception {
- final TestMROW test = new TestMROW("main");
-
- final String storeFile = getArg(args, "-file", null);
-
- Journal store = (Journal) test.getStore(storeFile);
- try {
-
- final long timeout = 20;
-
- final int nclients = (int) getLongArg(args, "-clients", 20); // 20
-
- final long nwrites = getLongArg(args, "-nwrites", 100000); //1000000;
-
- final int writeDelayMillis = 1;
-
- final long ntrials = getLongArg(args, "-ntrials", 100000); // 100000;
-
- final int reclen = (int) getLongArg(args, "-reclen", 128); // 128;
-
- final long nreads = getLongArg(args, "-nreads", 1000); // 1000;
-
- final long nruns = getLongArg(args, "-nruns", 1); // 1000;
-
- final AtomicInteger nerr = new AtomicInteger();
-
- for (int i = 0; i < nruns; i++) {
- doMROWTest(store, nwrites, writeDelayMillis, timeout, nclients,
- ntrials, reclen, nreads, nerr, true /*readAll*/);
-
- store.commit();
-
- store = (new TestRWJournal()).reopenStore(store);
-
- System.out.println("Completed run: " + i);
- }
-
- } finally {
-
- if (storeFile == null)
- store.destroy();
-
- }
-
- }
-
}
/**
@@ -3172,8 +3112,6 @@
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
- * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher
- * $
*/
public static class TestMRMW extends AbstractMRMWTestCase {
@@ -3208,8 +3146,6 @@
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
- * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher
- * $
*/
public static class TestInterrupts extends AbstractInterruptsTestCase {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|