From: <mar...@us...> - 2014-07-03 10:21:32
|
Revision: 8521 http://sourceforge.net/p/bigdata/code/8521 Author: martyncutcher Date: 2014-07-03 10:21:20 +0000 (Thu, 03 Jul 2014) Log Message: ----------- Added CommitState to ensure that the RWStore is able to reset/abort following an error following or during the RWStore.commit() call - ticket #973. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-07-02 22:44:27 UTC (rev 8520) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-07-03 10:21:20 UTC (rev 8521) @@ -343,7 +343,7 @@ String META_BITS_DEMI_SPACE = RWStore.class.getName() + ".metabitsDemispace"; String DEFAULT_META_BITS_DEMI_SPACE = "false"; - + /** * Defines the number of bits that must be free in a FixedAllocator for * it to be added to the free list. This is used to ensure a level @@ -824,8 +824,7 @@ m_metaBits = new int[m_metaBitsSize]; m_metaTransientBits = new int[m_metaBitsSize]; - - + m_quorum = quorum; m_fd = fileMetadata.file; @@ -1488,7 +1487,6 @@ "Incompatible RWStore header version: storeVersion=" + storeVersion + ", cVersion=" + cVersion); } - m_lastDeferredReleaseTime = strBuf.readLong(); if (strBuf.readInt() != cDefaultMetaBitsSize) { throw new IllegalStateException("Store opened with unsupported metabits size"); @@ -2880,15 +2878,19 @@ // } /** - * The semantics of reset are to revert unisolated writes to committed state. - * + * The semantics of reset are to revert unisolated writes to committed + * state. + * <p> * Unisolated writes must also be removed from the write cache. - * + * <p> * The AllocBlocks of the FixedAllocators maintain the state to determine * the correct reset behavior. - * + * <p> * If the store is using DirectFixedAllocators then an IllegalStateException - * is thrown + * is thrown. + * <p> + * If there is an active {@link #m_commitStateRef}, then this indicates a + * failure after the {@link RWStore#commit()} had "succeeded". */ public void reset() { @@ -2899,7 +2901,16 @@ try { assertOpen(); // assertNoRebuild(); + + final CommitState commitState = m_commitStateRef + .getAndSet(null/* newValue */); + + if (commitState != null) { + commitState.reset(); // restore state values on RWStore. + + } + boolean isolatedWrites = false; /** * Clear all allocators, not just dirty allocators, since we also @@ -3101,23 +3112,89 @@ return requiresCommit(); } -// static final float s_version = 3.0f; -// -// public String getVersionString() { -// return "RWStore " + s_version; -// } + /** + * Object recording the undo state for the {@link RWStore#commit()} ... + * {@link RWStore#postCommit()} sequence. The {@link CommitState} must + * either {@link CommitState#commit()} or {@link CommitState#reset()}. Those + * {@link CommitState} methods are invoked out of the corresponding + * {@link RWStore} methods. + * + * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is not + * robust to internal failure.</a> + */ + private class CommitState { + /* + * Critical pre-commit state that must be restored if a commit is + * discarded. + */ + private final int m_lastCommittedNextAllocation; + private final long m_storageStatsAddr; + private final int m_metaBitsAddr; + CommitState() { + // retain copy of critical pre-commit state + if (!m_allocationWriteLock.isHeldByCurrentThread()) + throw new IllegalMonitorStateException(); + m_lastCommittedNextAllocation = RWStore.this.m_committedNextAllocation; + m_storageStatsAddr = RWStore.this.m_storageStatsAddr; + m_metaBitsAddr = RWStore.this.m_metaBitsAddr; + } + + void postCommit() { + + // NOP + + } + + /** Reset pre-commit state to support reset/abort/rollback. */ + void reset() { + if (!m_allocationWriteLock.isHeldByCurrentThread()) + throw new IllegalMonitorStateException(); + RWStore.this.m_storageStatsAddr = m_storageStatsAddr; + RWStore.this.m_committedNextAllocation = m_lastCommittedNextAllocation; + RWStore.this.m_metaBitsAddr = m_metaBitsAddr; + } + + } + + /** + * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is not + * robust to internal failure.</a> + */ + private final AtomicReference<CommitState> m_commitStateRef = new AtomicReference<CommitState>(); + + /** + * Package private method used by the test suite. + */ + void clearCommitStateRef() { + + m_commitStateRef.set(null/* newValue */); + + } + + @Override public void commit() { assertOpen(); // assertNoRebuild(); checkCoreAllocations(); - // take allocation lock to prevent other threads allocating during commit + // take allocation lock to prevent other threads allocating during commit m_allocationWriteLock.lock(); try { + /* + * Create a transient object to retain values of previous + * commitState to support abort/reset/rollback if requested after + * this commit() is requested. + */ + if (!m_commitStateRef.compareAndSet(null/* expect */, + new CommitState())) { + throw new IllegalStateException( + "RWStore commitState found, incomplete previous commit must be rolled back/aborted"); + } + // final int totalFreed = checkDeferredFrees(true, journal); // free now if possible // // if (totalFreed > 0 && log.isInfoEnabled()) { @@ -3150,20 +3227,20 @@ if ((!m_useMetabitsDemispace) && reqmbc < m_maxFixedAlloc) { nmbaddr = alloc(reqmbc, null); } - + // If existing allocation, then free it - if (m_metaBitsAddr < 0) { - + if (m_metaBitsAddr < 0) { + final int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; - // Call immediateFree - no need to defer freeof metaBits, this - // has to stop somewhere! - // No more allocations must be made - immediateFree((int) m_metaBitsAddr, oldMetaBitsSize); - - } - + // Call immediateFree - no need to defer freeof metaBits, this + // has to stop somewhere! + // No more allocations must be made + immediateFree((int) m_metaBitsAddr, oldMetaBitsSize); + + } + m_metaBitsAddr = nmbaddr; } @@ -3188,8 +3265,8 @@ } if (m_metaBitsAddr > 0) { // Demi-Space - // Now "toggle" m_metaBitsAddr - 64K boundary - m_metaBitsAddr ^= 0x01; // toggle zero or 64K offset + // Now "toggle" m_metaBitsAddr - 64K boundary + m_metaBitsAddr ^= 0x01; // toggle zero or 64K offset } if (log.isDebugEnabled()) { @@ -3199,7 +3276,7 @@ } else { mbaddr = convertAddr(-m_metaBitsAddr); // maximum 48 bit address range } - + log.debug("Writing metabits at " + mbaddr); } @@ -3254,9 +3331,6 @@ throw new RuntimeException(e); } - // Now remember the committed next allocation that will be checked in reset() - m_committedNextAllocation = m_nextAllocation; - // Should not write rootBlock, this is responsibility of client // to provide control // writeFileSpec(); @@ -3291,12 +3365,13 @@ log.debug(showAllocatorList()); } - + } /** * {@inheritDoc} */ + @Override public Lock getCommitLock() { return m_allocationWriteLock; @@ -3308,11 +3383,25 @@ * <p> * Commits the FixedAllocator bits */ + @Override public void postCommit() { if (!m_allocationWriteLock.isHeldByCurrentThread()) throw new IllegalMonitorStateException(); + final CommitState commitState = m_commitStateRef.getAndSet(null/* newValue */); + + if (commitState == null) { + + throw new IllegalStateException( + "No current CommitState found on postCommit"); + + } else { + + commitState.postCommit(); + + } + for (FixedAllocator fa : m_commitList) { fa.postCommit(); @@ -3323,6 +3412,7 @@ } + @Override public int checkDeferredFrees(final AbstractJournal journal) { if (journal == null) @@ -4150,7 +4240,7 @@ try { if (addr >= 0) { - + return addr & 0xFFFFFFE0; } else { @@ -4277,31 +4367,31 @@ * * @return long representation of metaBitsAddr PLUS the size */ - public long getMetaBitsAddr() { + public long getMetaBitsAddr() { long ret = 0; - + if (m_metaBitsAddr < 0) { ret = physicalAddress((int) m_metaBitsAddr); } else { - // long ret = physicalAddress((int) m_metaBitsAddr); + // long ret = physicalAddress((int) m_metaBitsAddr); ret = convertAddr(-m_metaBitsAddr); // maximum 48 bit address range } - ret <<= 16; - + ret <<= 16; + // include space for version, allocSizes and deferred free info AND // cDefaultMetaBitsSize final int metaBitsSize = cMetaHdrFields + m_metaBits.length + m_allocSizes.length; - ret += metaBitsSize; + ret += metaBitsSize; + + if (log.isTraceEnabled()) + log.trace("Returning metabitsAddr: " + ret + ", for " + + m_metaBitsAddr + " - " + m_metaBits.length + ", " + + metaBitsSize); - if (log.isTraceEnabled()) - log.trace("Returning metabitsAddr: " + ret + ", for " - + m_metaBitsAddr + " - " + m_metaBits.length + ", " - + metaBitsSize); + return ret; + } - return ret; - } - /** * * @return the address of the metaBits @@ -5178,7 +5268,7 @@ checkRootBlock(rootBlock); assertOpen(); - + if (log.isTraceEnabled()) { log.trace("Writing new rootblock with commitCounter: " + rootBlock.getCommitCounter() + ", commitRecordAddr: " @@ -7242,7 +7332,7 @@ return ret; } - + /** * Forces a reset of the metabits allocation on the next commit. * <p> Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2014-07-02 22:44:27 UTC (rev 8520) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2014-07-03 10:21:20 UTC (rev 8521) @@ -27,24 +27,16 @@ package com.bigdata.rwstore; -import java.io.EOFException; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Properties; import java.util.Random; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import junit.extensions.proxy.ProxyTestSuite; import junit.framework.Test; @@ -577,8 +569,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher - * $ */ public static class TestRawStore extends AbstractRestartSafeTestCase { @@ -1540,116 +1530,23 @@ public void test_metaAlloc() { Journal store = (Journal) getStore(); - try { + try { - final RWStrategy bs = (RWStrategy) store.getBufferStrategy(); + final RWStrategy bs = (RWStrategy) store.getBufferStrategy(); - final RWStore rw = bs.getStore(); - long realAddr = 0; - for (int r = 0; r < 100; r++) { - for (int i = 0; i < 1000; i++) { - int allocAddr = rw.metaAlloc(); - - realAddr = rw.metaBit2Addr(allocAddr); - } - rw.commit(); + final RWStore rw = bs.getStore(); + long realAddr = 0; + for (int i = 0; i < 100000; i++) { + int allocAddr = rw.metaAlloc(); + + realAddr = rw.metaBit2Addr(allocAddr); } - - if (log.isInfoEnabled()) - log.info("metaAlloc lastAddr: " + realAddr); + if(log.isInfoEnabled())log.info("metaAlloc lastAddr: " + realAddr); } finally { store.destroy(); } } - - /** - * Tests the MetabitsUtil to switch the demispace. - * - * If the address is a demispace then addr % 64K == 0. - * - * If the address is NOT a demispace then it should be less than first - * demispace - */ - public void test_metabitsDemispace() { - Journal store = (Journal) getStore(); - try { - RWStrategy bs = (RWStrategy) store.getBufferStrategy(); - RWStore rw = bs.getStore(); - final String fname = rw.getStoreFile().getAbsolutePath(); - - store.commit(); - - final long fa1 = rw.getMetaBitsStoreAddress(); - - rw.ensureMetabitsDemispace(true); - store.commit(); - - final long ds1 = rw.getMetaBitsStoreAddress(); - - assertTrue((ds1 & 0xFFFF) == 0); // MOD 64K - assertTrue(ds1 > fa1); - - rw.ensureMetabitsDemispace(false); - store.commit(); - - final long fa2 = rw.getMetaBitsStoreAddress(); - - assertTrue(ds1 > fa2); - - rw.ensureMetabitsDemispace(true); - store.commit(); - - final long ds2 = rw.getMetaBitsStoreAddress(); - - assertTrue((ds2 & 0xFFFF) == 0); - assertTrue(ds2 > ds1); - - // Now use MetaBitsUtil - - store.close(); - - MetabitsUtil.main(new String[] { "-store", fname, "-usedemispace", "false"}); - - store = getExplicitStore(fname); - - bs = (RWStrategy) store.getBufferStrategy(); - rw = bs.getStore(); - final long fa3 = rw.getMetaBitsStoreAddress(); - - assertTrue(fa3 < ds1); - - store.close(); - - MetabitsUtil.main(new String[] { "-store", fname, "-usedemispace", "true"}); - - store = getExplicitStore(fname); - - bs = (RWStrategy) store.getBufferStrategy(); - rw = bs.getStore(); - - final long ds3 = rw.getMetaBitsStoreAddress(); - assertTrue((ds3 & 0xFFFF) == 0); - assertTrue(ds3 > ds2); - - } finally { - store.destroy(); - } - } - - Journal getExplicitStore(String storeFile) { - - final Properties properties = new Properties(); - - properties.setProperty(Options.FILE, storeFile); - - properties.setProperty(Options.BUFFER_MODE, - BufferMode.DiskRW.toString()); - - return new Journal(properties);// .getBufferStrategy(); - - } - static class DummyAllocationContext implements IAllocationContext { } @@ -2104,7 +2001,136 @@ store.destroy(); } } + + /** + * Verify that we correctly restore the RWStore commit state if + * {@link RWStore#commit()} is followed by {@link RWStore#reset()} + * rather than {@link RWStore#postCommit()}. + * + * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is + * not robust to internal failure.</a> + */ + public void test_commitState() { + Journal store = (Journal) getStore(); + try { + + final RWStrategy bs = (RWStrategy) store.getBufferStrategy(); + + final RWStore rws = bs.getStore(); + + final long addr = bs.write(randomData(78)); + + // do 1st half of the RWStore commit protocol. + rws.commit(); + + // then discard write set. + store.abort(); + + assertFalse(bs.isCommitted(addr)); // rolled back + + // now cycle standard commit to confirm correct reset + for (int c = 0; c < 50; c++) { + bs.write(randomData(78)); + store.commit(); + } + + + } finally { + store.destroy(); + } + } + /** + * Test verifies that a failure to retain the commit state in + * {@link RWStore#commit()} will cause problems if the write set is + * discarded by {@link RWStore#reset()} such that subsequent write sets + * run into persistent addressing errors. + * + * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is + * not robust to internal failure.</a> + */ + public void test_commitStateError() { + Journal store = (Journal) getStore(); + try { + + RWStrategy bs = (RWStrategy) store.getBufferStrategy(); + + RWStore rws = bs.getStore(); + + final long addr = bs.write(randomData(78)); + + // do first half of the RWStore protocol. + rws.commit(); + + /* + * remove the commit state such that subsequent abort()/reset() + * will fail to correctly restore the pre-commit state. + */ + rws.clearCommitStateRef(); + + // abort() succeeds because it is allowed even if commit() was + // not called. + store.abort(); + + assertFalse(bs.isCommitted(addr)); // rolled back + + try { + // now cycle standard commit to force an error from bad reset + for (int c = 0; c < 50; c++) { + bs.write(randomData(78)); + store.commit(); + } + fail("Expected failure"); + } catch (Exception e) { + // expected + log.info("Expected!"); + } + + } finally { + store.destroy(); + } + } + + /** + * Verify that a double-commit causes an illegal state exception. + * Further verify that an {@link RWStore#reset()} allwos us to then + * apply and commit new write sets. + * + * @see <a href="http://trac.bigdata.com/ticket/973" >RWStore commit is + * not robust to internal failure.</a> + */ + public void test_commitStateIllegal() { + final Journal store = (Journal) getStore(); + try { + + final RWStrategy bs = (RWStrategy) store.getBufferStrategy(); + + final RWStore rws = bs.getStore(); + + bs.write(randomData(78)); + + rws.commit(); + + try { + store.commit(); + + fail("Expected failure"); + } catch (Exception ise) { + if (InnerCause.isInnerCause(ise, IllegalStateException.class)) { + store.abort(); + + store.commit(); + } else { + fail("Unexpected Exception"); + } + } + + + } finally { + store.destroy(); + } + } + public void test_allocCommitFreeWithHistory() { Journal store = (Journal) getStore(4); try { @@ -3052,8 +3078,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher - * $ */ public static class TestMROW extends AbstractMROWTestCase { @@ -3065,15 +3089,11 @@ super(name); } - protected IRawStore getStore(String storeFile) { + protected IRawStore getStore() { final Properties properties = getProperties(); - if (storeFile == null) { - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - } else { - properties.setProperty(Options.FILE, storeFile); - } + properties.setProperty(Options.CREATE_TEMP_FILE, "true"); properties.setProperty(Options.DELETE_ON_EXIT, "true"); @@ -3085,86 +3105,6 @@ } - protected IRawStore getStore() { - - return getStore(null); // no file provided by default - - } - - static long getLongArg(final String[] args, final String arg, final long def) { - final String sv = getArg(args, arg, null); - - return sv == null ? def : Long.parseLong(sv); - } - - static String getArg(final String[] args, final String arg, final String def) { - for (int p = 0; p < args.length; p+=2) { - if (arg.equals(args[p])) - return args[p+1]; - } - - return def; - } - - /** - * Stress variant to support multiple parameterised runs - * - * Arguments - * - * -file - optional explicit file path - * -clients - reader threads - * -nwrites - number of records written - * -reclen - size of record written - * -ntrials - number of readers - * -nreads - number of reads made by each reader - * -nruns - number of times to repeat process with reopen each time - */ - public static void main(final String[] args) throws Exception { - final TestMROW test = new TestMROW("main"); - - final String storeFile = getArg(args, "-file", null); - - Journal store = (Journal) test.getStore(storeFile); - try { - - final long timeout = 20; - - final int nclients = (int) getLongArg(args, "-clients", 20); // 20 - - final long nwrites = getLongArg(args, "-nwrites", 100000); //1000000; - - final int writeDelayMillis = 1; - - final long ntrials = getLongArg(args, "-ntrials", 100000); // 100000; - - final int reclen = (int) getLongArg(args, "-reclen", 128); // 128; - - final long nreads = getLongArg(args, "-nreads", 1000); // 1000; - - final long nruns = getLongArg(args, "-nruns", 1); // 1000; - - final AtomicInteger nerr = new AtomicInteger(); - - for (int i = 0; i < nruns; i++) { - doMROWTest(store, nwrites, writeDelayMillis, timeout, nclients, - ntrials, reclen, nreads, nerr, true /*readAll*/); - - store.commit(); - - store = (new TestRWJournal()).reopenStore(store); - - System.out.println("Completed run: " + i); - } - - } finally { - - if (storeFile == null) - store.destroy(); - - } - - } - } /** @@ -3172,8 +3112,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher - * $ */ public static class TestMRMW extends AbstractMRMWTestCase { @@ -3208,8 +3146,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher - * $ */ public static class TestInterrupts extends AbstractInterruptsTestCase { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |