From: <mar...@us...> - 2011-05-17 15:50:23
|
Revision: 4513 http://bigdata.svn.sourceforge.net/bigdata/?rev=4513&view=rev Author: martyncutcher Date: 2011-05-17 15:50:17 +0000 (Tue, 17 May 2011) Log Message: ----------- Fix shadow allocation contexts and ensure that correct BTree checkpoint is used for IsolatedActionJournal in AbstractTask.getIndex Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2011-05-16 20:28:56 UTC (rev 4512) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2011-05-17 15:50:17 UTC (rev 4513) @@ -586,6 +586,8 @@ throw new NoSuchIndexException(name); } + + long checkpointAddr = entry.checkpointAddr; /* * Note: At this point we have an exclusive lock on the named @@ -628,14 +630,22 @@ * isolated. [Also, we do not want N2A to cache references to a * B+Tree backed by a different shadow journal.] */ - + if ((resourceManager.getLiveJournal().getBufferStrategy() instanceof RWStrategy)) { /* * Note: Do NOT use the name2Addr cache for the RWStore. * Each unisolated index view MUST be backed by a shadow * journal! + * + * But, fetch the btree from the cache to ensure we use the + * most recent checkpoint */ btree = null; + + final BTree tmpbtree = name2Addr.getIndexCache(name); + if (tmpbtree != null) + checkpointAddr = tmpbtree.getCheckpoint().getCheckpointAddr(); + } else { // recover from unisolated index cache. btree = name2Addr.getIndexCache(name); @@ -650,7 +660,7 @@ // re-load btree from the store. btree = BTree.load(// tmp, // backing store. - entry.checkpointAddr,// + checkpointAddr,// false// readOnly ); @@ -1086,9 +1096,11 @@ // clear the commit list. commitList.clear(); - // Detach the allocation context used by the operation. - ((IsolatedActionJournal) getJournal()).detachContext(); + // Detach the allocation context used by the operation, but increment + // txCount + ((IsolatedActionJournal) getJournal()).prepareCommit(); + final long elapsed = System.nanoTime() - begin; if(INFO) { @@ -1119,16 +1131,9 @@ * either the code path where the commit succeeds or the code path where it * fails. The boolean argument indicates whether or not the group commit * succeeded. Throws exceptions are trapped and logged. - * <p> - * Note: This method is NOT invoked if a task fails before joining the group - * commit. In that case, {@link #abortTask()} will be invoked. If the task - * succeeds then {@link #checkpointTask()} will be called and the task will - * eventually join a commit group. When the commit runs for that commit - * group, this method will be invoked on either the success or failure path - * for the group commit. */ void afterTaskHook(boolean abort) { - + ((IsolatedActionJournal) getJournal()).completeTask(); } /* @@ -2211,7 +2216,23 @@ } - /** + public void prepareCommit() { +// final IBufferStrategy bufferStrategy = delegate.getBufferStrategy(); +// if (bufferStrategy instanceof RWStrategy) { +// ((RWStrategy) bufferStrategy).getRWStore().activateTx(); +// } + // now detach! + detachContext(); + } + + public void completeTask() { + final IBufferStrategy bufferStrategy = delegate.getBufferStrategy(); + if (bufferStrategy instanceof RWStrategy) { + ((RWStrategy) bufferStrategy).getRWStore().deactivateTx(); + } + } + + /** * This class prevents {@link ITx#UNISOLATED} tasks from having direct * access to the {@link AbstractJournal} using * {@link AbstractTask#getJournal()}. @@ -2241,6 +2262,10 @@ final IBufferStrategy bufferStrategy = source.getBufferStrategy(); if (bufferStrategy instanceof RWStrategy) { + // must grab the tx BEFORE registering the context to correctly + // bracket, since the tx count is decremented AFTER the + // context is released + ((RWStrategy) bufferStrategy).getRWStore().activateTx(); ((RWStrategy) bufferStrategy).getRWStore().registerContext(this); } } @@ -2587,6 +2612,8 @@ public void abortContext() { delegate.abortContext(this); + + completeTask(); } public ScheduledFuture<?> addScheduledTask(final Runnable task, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2011-05-16 20:28:56 UTC (rev 4512) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2011-05-17 15:50:17 UTC (rev 4513) @@ -140,8 +140,8 @@ public boolean freeBit(final int bit, final boolean sessionProtect) { if (!RWStore.tstBit(m_live, bit)) { - if (sessionProtect && RWStore.tstBit(m_transients, bit)) - return false; +// if (sessionProtect && RWStore.tstBit(m_transients, bit)) +// return false; throw new IllegalArgumentException("Freeing bit not set"); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-05-16 20:28:56 UTC (rev 4512) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2011-05-17 15:50:17 UTC (rev 4513) @@ -120,9 +120,15 @@ * where committed data is accessed even if has been marked as ready to * be recycled after the next commit */ + final long paddr = RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); if (RWStore.tstBit(block.m_transients, bit)) { - return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); + return paddr; } else { + if (RWStore.tstBit(block.m_commit, bit)) { + throw new IllegalStateException("Address committed but not set in transients"); + } + m_store.showWriteCacheDebug(paddr); + return 0L; } } @@ -234,9 +240,8 @@ try { str.writeInt(m_size); -// if (!m_sessionActive) -// System.out.println("Committing allocator, protection: " + m_sessionActive + ", transient frees: " + m_freeTransients); - + boolean protectTransients = m_sessionActive || m_store.isSessionProtected(); + final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); while (iter.hasNext()) { final AllocBlock block = iter.next(); @@ -246,7 +251,7 @@ str.writeInt(block.m_live[i]); } - if (!m_sessionActive) { + if (!protectTransients) { block.m_transients = block.m_live.clone(); } @@ -553,13 +558,8 @@ final int block = offset/nbits; m_sessionActive = m_store.isSessionProtected(); -// if (!m_sessionActive) { -// System.out.println("Freeing " + addr + " without protection"); -// } + try { -// if (!m_sessionActive) { -// System.out.println("NO SESSION PROTECT FOR " + addr + "[" + size + "]"); -// } if (((AllocBlock) m_allocBlocks.get(block)) .freeBit(offset % nbits, m_sessionActive && !overideSession)) { // bit adjust @@ -847,10 +847,6 @@ final boolean ret = !isCommitted(offset); -// if (ret) { -// System.out.println("CAN FREE " + addr + "[" + size + "]"); -// } - return ret; } else { return false; @@ -880,7 +876,7 @@ checkFreeList(); - m_sessionActive = false; + m_sessionActive = m_store.isSessionProtected(); } } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-05-16 20:28:56 UTC (rev 4512) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-05-17 15:50:17 UTC (rev 4513) @@ -1684,6 +1684,8 @@ assert(m_activeTxCount == 0 && m_contexts.isEmpty()); if (m_minReleaseAge == 0) { + if (log.isDebugEnabled()) + log.debug("RELEASE SESSIONS"); for (FixedAllocator fa : m_allocs) { fa.releaseSession(m_writeCache); } @@ -1990,7 +1992,7 @@ directWrite(pa, buf, 0, size, chk); } else { try { - m_writeCache.write(pa, ByteBuffer.wrap(buf, 0, size), chk); + m_writeCache.write(pa, ByteBuffer.wrap(buf, 0, size), chk); } catch (InterruptedException e) { throw new RuntimeException("Closed Store?", e); } @@ -3549,6 +3551,8 @@ if (alloc != null) { m_contextRemovals++; alloc.release(); + } else { + throw new IllegalStateException("Multiple call to detachContext"); } if (m_contexts.isEmpty() && this.m_activeTxCount == 0) { @@ -3578,6 +3582,7 @@ m_contextRemovals++; alloc.abort(); } + } finally { m_allocationLock.unlock(); } @@ -4462,6 +4467,9 @@ public void deactivateTx() { m_allocationLock.lock(); try { + if (m_activeTxCount == 0) { + throw new IllegalStateException("Tx count must be positive!"); + } m_activeTxCount--; if(log.isInfoEnabled()) log.info("#activeTx="+m_activeTxCount); @@ -4528,5 +4536,9 @@ } } + public void showWriteCacheDebug(long paddr) { + log.warn("WriteCacheDebug: " + paddr + " - " + m_writeCache.addrDebugInfo(paddr)); + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2011-05-16 20:28:56 UTC (rev 4512) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2011-05-17 15:50:17 UTC (rev 4513) @@ -826,6 +826,43 @@ } } + public void testAllocationContexts() { + + Journal store = (Journal) getStore(); + + try { + + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + + RWStore rw = bufferStrategy.getRWStore(); + + IAllocationContext cntxt1 = new IAllocationContext() {}; + + IAllocationContext cntxt2 = new IAllocationContext() {}; + + // allocate a global address + int gaddr = rw.alloc(412, null); + + store.commit(); + + // allocate a context address grabbing previous global allocation + int c1addr = rw.alloc(412, cntxt1); + // imagine we are re-allocating the original address + rw.free(gaddr, 412, cntxt1); + // now abort context + rw.abortContext(cntxt1); + + store.commit(); + + long paddr = rw.physicalAddress(gaddr); + + assertTrue("Global allocation must be protected", paddr != 0); + + } finally { + store.destroy(); + } + } + void showStore(Journal store) { RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java 2011-05-16 20:28:56 UTC (rev 4512) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java 2011-05-17 15:50:17 UTC (rev 4513) @@ -197,6 +197,8 @@ public void test_largeBlobAllocation() { final int sectorSize = manager.getSectorSize(); + + assertTrue(manager.getSlotBytes() == 0L); final long addr = manager.allocate(sectorSize, false/* blocks */); @@ -207,6 +209,10 @@ assertEquals("allocationSize", sectorSize, manager.allocationSize(addr)); + manager.free(addr); + + assertTrue(manager.getSlotBytes() == 0L); + } /** @@ -225,11 +231,11 @@ public void test_blockingAllocation() throws InterruptedException, ExecutionException, TimeoutException { - final int sectorSize = manager.getSectorSize(); + final int sectorSize = manager.getSectorSize(); // allow for blob // grab all the memory (for this size of allocation). final List<Long> addrs = new LinkedList<Long>(); - while(true) { + while (true) { try { final long addr = manager.allocate(sectorSize, false/* blocks */); addrs.add(addr); @@ -319,6 +325,8 @@ log.info("freeing: addr=" + addr + ", sizeof(addr)=" + manager.allocationSize(addr) + ", slotBytes=" + manager.getSlotBytes()); + + assertTrue(manager.getSlotBytes() == 0); } finally { @@ -341,6 +349,12 @@ log.info("Manager slotBytes: " + manager.getSlotBytes()); } + + public void test_stressBlockingAllocation() throws InterruptedException, ExecutionException, TimeoutException { + for (int i = 0; i < 50; i++) { + test_blockingAllocation(); + } + } /** * Unit test for reading a copy of the data from the {@link IMemoryManager}. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |