From: <mar...@us...> - 2010-11-01 12:52:07
|
Revision: 3852 http://bigdata.svn.sourceforge.net/bigdata/?rev=3852&view=rev Author: martyncutcher Date: 2010-11-01 12:52:00 +0000 (Mon, 01 Nov 2010) Log Message: ----------- 1) Change RWStrategy to pass the Journal reference on commit to remove need to set LocalTransactionService 2) Change deferred free to write to PSOutputStream to avoid maintaining inmemory references. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -628,7 +628,7 @@ } /** The default is a NOP. */ - public void commit() { + public void commit(IJournal journal) { // NOP for WORM. Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -1045,8 +1045,6 @@ ResourceManager.openJournal(getFile() == null ? null : getFile().toString(), size(), getBufferStrategy() .getBufferMode()); - this._bufferStrategy.setCommitRecordIndex(_commitRecordIndex); - } finally { lock.unlock(); @@ -2103,8 +2101,6 @@ // clear reference and reload from the store. _commitRecordIndex = _getCommitRecordIndex(); - _bufferStrategy.setCommitRecordIndex(_commitRecordIndex); - // clear the array of committers. _committers = new ICommitter[_committers.length]; @@ -2358,7 +2354,7 @@ * until commit, leading to invalid addresses for recent store * allocations. */ - _bufferStrategy.commit(); + _bufferStrategy.commit(this); /* * next offset at which user data would be written. @@ -2875,7 +2871,7 @@ * {@link #getCommitRecord(long)} to obtain a distinct instance * suitable for read-only access. */ - protected CommitRecordIndex getCommitRecordIndex() { + public CommitRecordIndex getCommitRecordIndex() { final ReadLock lock = _fieldReadWriteLock.readLock(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -216,8 +216,9 @@ * A method that removes assumptions of how a specific strategy commits data. For most strategies the action is void * since the client WORM DISK strategy writes data as allocated. For the Read Write Strategy more data must be managed * as part of the protocol outside of the RootBlock, and this is the method that triggers that management. + * @param abstractJournal */ - public void commit(); + public void commit(IJournal journal); /** * A method that requires the implementation to discard its buffered write @@ -258,29 +259,4 @@ */ public boolean useChecksums(); - /** - * Needed to enable transaction support for standalone buffer strategies. - * - * The WORMStrategy does not need this since no data is ever deleted, but - * the RWStrategy must manage deletions such that no data is deleted until - * it can be guaranteed not to be accessed by existing transactions. - * - * @param localTransactionManager - * The transaction manager for the owning Journal - */ - public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager); - - - /** - * Needed to enable transaction support for standalone buffer strategies. - * - * The WORMStrategy does not need this since no data is ever deleted, but - * the RWStrategy must manage deletions and needs access to the historical - * commitRecords which reference the blocks of deferred deleted addresses. - * - * @param commitRecordIndex - * The CommitRecordIndex for the owning Journal - */ - public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex); - } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -221,10 +221,8 @@ concurrencyManager = new ConcurrencyManager(properties, localTransactionManager, this); - getBufferStrategy().setTransactionManager(localTransactionManager); + } - } - // public void init() { // // super.init(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantLock; @@ -525,10 +526,10 @@ * * Must pass in earliestTxnTime to commitChanges to enable */ - public void commit() { + public void commit(IJournal journal) { m_commitLock.lock(); try { - m_store.commitChanges(); // includes a force(false) + m_store.commitChanges((Journal) journal); // includes a force(false) } finally { m_commitLock.unlock(); } @@ -546,6 +547,12 @@ public void force(boolean metadata) { try { m_store.flushWrites(metadata); + } catch (ClosedByInterruptException e) { + m_needsReopen = true; + + reopen(); // FIXME + + throw new RuntimeException(e); } catch (IOException e) { m_needsReopen = true; @@ -716,11 +723,6 @@ } - public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager) { - this.localTransactionManager = localTransactionManager; - m_store.setTransactionService((JournalTransactionService) localTransactionManager.getTransactionService()); - } - public long getPhysicalAddress(long addr) { int rwaddr = decodeAddr(addr); @@ -737,9 +739,4 @@ public long saveDeleteBlocks() { return m_store.saveDeferrals(); } - - public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { - m_store.setCommitRecordIndex(commitRecordIndex); - } - } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -158,28 +158,29 @@ * resets private state variables for reuse of stream **/ void init(IStore store, int maxAlloc, IAllocationContext context) { + m_store = store; + m_context = context; + + m_blobThreshold = maxAlloc-4; // allow for checksum + if (m_buf == null || m_buf.length != m_blobThreshold) + m_buf = new byte[m_blobThreshold]; + + reset(); + } + + public void reset() { m_isSaved = false; m_headAddr = 0; m_prevAddr = 0; m_count = 0; m_bytesWritten = 0; - m_store = store; m_isSaved = false; - // m_blobThreshold = m_store.bufferChainOffset(); - m_blobThreshold = maxAlloc-4; // allow for checksum - m_buf = new byte[maxAlloc-4]; m_blobHeader = null; m_blobHdrIdx = 0; - - m_context = context; + } - - // FIXME: if autocommit then we should provide start/commit via init and save - // m_store.startTransaction(); - } - /**************************************************************** * write a single byte * Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -52,6 +52,7 @@ import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.BufferedWrite; import com.bigdata.io.writecache.WriteCache; +import com.bigdata.journal.Journal; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.CommitRecordIndex; import com.bigdata.journal.ForceEnum; @@ -323,8 +324,7 @@ private static final int MAX_DEFERRED_FREE = 4094; // fits in 16k block volatile long m_lastDeferredReleaseTime = 23; // zero is invalid time final ArrayList<Integer> m_currentTxnFreeList = new ArrayList<Integer>(); - - volatile long deferredFreeCount = 0; + final PSOutputStream m_deferredFreeOut; private ReopenFileChannel m_reopener = null; @@ -490,7 +490,7 @@ m_maxFixedAlloc = m_allocSizes[m_allocSizes.length-1]*64; m_minFixedAlloc = m_allocSizes[0]*64; - commitChanges(); + commitChanges(null); } else { @@ -502,6 +502,7 @@ assert m_maxFixedAlloc > 0; + m_deferredFreeOut = PSOutputStream.getNew(this, m_maxFixedAlloc, null); } catch (IOException e) { throw new StorageTerminalError("Unable to initialize store", e); } @@ -1213,8 +1214,6 @@ private volatile long m_maxAllocation = 0; private volatile long m_spareAllocation = 0; - - private CommitRecordIndex m_commitRecordIndex; public int alloc(final int size, IAllocationContext context) { if (size > m_maxFixedAlloc) { @@ -1569,7 +1568,7 @@ return "RWStore " + s_version; } - public void commitChanges() { + public void commitChanges(Journal journal) { checkCoreAllocations(); // take allocation lock to prevent other threads allocating during commit @@ -1577,7 +1576,7 @@ try { - checkDeferredFrees(true); // free now if possible + checkDeferredFrees(true, journal); // free now if possible // Allocate storage for metaBits long oldMetaBits = m_metaBitsAddr; @@ -1653,30 +1652,26 @@ * Called prior to commit, so check whether storage can be freed and then * whether the deferredheader needs to be saved. */ - private void checkDeferredFrees(boolean freeNow) { - if (freeNow) checkFreeable(); - - // Commit can be called prior to Journal initialisation, in which case - // the commitRecordIndex will not be set. - if (m_commitRecordIndex == null) { - return; + public void checkDeferredFrees(boolean freeNow, Journal journal) { + if (journal != null) { + final JournalTransactionService transactionService = (JournalTransactionService) journal.getLocalTransactionManager().getTransactionService(); + + // Commit can be called prior to Journal initialisation, in which case + // the commitRecordIndex will not be set. + final CommitRecordIndex commitRecordIndex = journal.getCommitRecordIndex(); + + long latestReleasableTime = System.currentTimeMillis(); + + if (transactionService != null) { + latestReleasableTime -= transactionService.getMinReleaseAge(); + } + + Iterator<ICommitRecord> records = commitRecordIndex.getCommitRecords(m_lastDeferredReleaseTime, latestReleasableTime); + + freeDeferrals(records); } - - long latestReleasableTime = System.currentTimeMillis(); - - if (m_transactionService != null) { - latestReleasableTime -= m_transactionService.getMinReleaseAge(); - } - - Iterator<ICommitRecord> records = m_commitRecordIndex.getCommitRecords(m_lastDeferredReleaseTime, latestReleasableTime); - - freeDeferrals(records); } - - public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { - m_commitRecordIndex = commitRecordIndex; - } /** * * @return conservative requirement for metabits storage, mindful that the @@ -2654,45 +2649,39 @@ * * The deferred list is checked on AllocBlock and prior to commit. * - * There is also a possibility to check for deferral at this point, since - * we are passed both the currentCommitTime - against which this free - * will be deferred and the earliest tx start time against which we - * can check to see if + * DeferredFrees are written to the deferred PSOutputStream */ public void deferFree(int rwaddr, int sze) { m_deferFreeLock.lock(); try { - deferredFreeCount++; - m_currentTxnFreeList.add(rwaddr); + m_deferredFreeOut.writeInt(rwaddr); final Allocator alloc = getBlockByAddress(rwaddr); if (alloc instanceof BlobAllocator) { - m_currentTxnFreeList.add(sze); + m_deferredFreeOut.writeInt(sze); } - - // every so many deferrals, check for free - if (false && deferredFreeCount % 1000 == 0) { - checkFreeable(); - } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } finally { m_deferFreeLock.unlock(); } } - private void checkFreeable() { - if (m_transactionService == null) { + private void checkFreeable(final JournalTransactionService transactionService) { + if (transactionService == null) { return; } try { - Long freeTime = m_transactionService.tryCallWithLock(new Callable<Long>() { + Long freeTime = transactionService.tryCallWithLock(new Callable<Long>() { public Long call() throws Exception { long now = System.currentTimeMillis(); - long earliest = m_transactionService.getEarliestTxStartTime(); - long aged = now - m_transactionService.getMinReleaseAge(); + long earliest = transactionService.getEarliestTxStartTime(); + long aged = now - transactionService.getMinReleaseAge(); - if (m_transactionService.getActiveCount() == 0) { + if (transactionService.getActiveCount() == 0) { return aged; } else { return aged < earliest ? aged : earliest; @@ -2700,8 +2689,6 @@ } }, 5L, TimeUnit.MILLISECONDS); - - freeCurrentDeferrals(freeTime); } catch (RuntimeException e) { // fine, will try again later } catch (Exception e) { @@ -2711,38 +2698,6 @@ /** - * Frees all storage deferred against a txn release time less than that - * passed in. - * - * @param txnRelease - the max release time - */ - protected void freeCurrentDeferrals(long txnRelease) { - - m_deferFreeLock.lock(); - try { - if (m_rb.getLastCommitTime() <= txnRelease) { -// System.out.println("freeCurrentDeferrals"); - final Iterator<Integer> curdefers = m_currentTxnFreeList.iterator(); - while (curdefers.hasNext()) { - final int rwaddr = curdefers.next(); - Allocator alloc = getBlock(rwaddr); - if (alloc instanceof BlobAllocator) { - // if this is a Blob then the size is required - assert curdefers.hasNext(); - - immediateFree(rwaddr, curdefers.next()); - } else { - immediateFree(rwaddr, 0); // size ignored for FixedAllocators - } - } - m_currentTxnFreeList.clear(); - } - } finally { - m_deferFreeLock.unlock(); - } - } - - /** * Writes the content of currentTxnFreeList to the store. * * These are the current buffered frees that have yet been saved into @@ -2751,39 +2706,27 @@ * @return the address of the deferred addresses saved on the store */ public long saveDeferrals() { - final byte[] buf; m_deferFreeLock.lock(); try { - int addrCount = m_currentTxnFreeList.size(); - - if (addrCount == 0) { - return 0L; + if (m_deferredFreeOut.getBytesWritten() == 0) { + return 0; } + m_deferredFreeOut.writeInt(0); // terminate! + int outlen = m_deferredFreeOut.getBytesWritten(); + + long addr = m_deferredFreeOut.save(); + + addr <<= 32; + addr += outlen; + + m_deferredFreeOut.reset(); - buf = new byte[4 * (addrCount + 1)]; - ByteBuffer out = ByteBuffer.wrap(buf); - out.putInt(addrCount); - for (int i = 0; i < addrCount; i++) { - out.putInt(m_currentTxnFreeList.get(i)); - } - - // now we've saved it to the store, clear the list - m_currentTxnFreeList.clear(); - + return addr; + } catch (IOException e) { + throw new RuntimeException("Cannot write to deferred free", e); } finally { m_deferFreeLock.unlock(); } - - long rwaddr = alloc(buf, buf.length, null); - if (log.isTraceEnabled()) { - long paddr = physicalAddress((int) rwaddr); - log.trace("Saving deferred free block at " + paddr); - } - - rwaddr <<= 32; - rwaddr += buf.length; - - return rwaddr; } /** @@ -2802,19 +2745,21 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_allocationLock.lock(); try { - int addrs = strBuf.readInt(); + int nxtAddr = strBuf.readInt(); - while (addrs-- > 0) { // while (false && addrs-- > 0) { - int nxtAddr = strBuf.readInt(); + while (nxtAddr != 0) { // while (false && addrs-- > 0) { Allocator alloc = getBlock(nxtAddr); if (alloc instanceof BlobAllocator) { - assert addrs > 0; // a Blob address MUST have a size - --addrs; - immediateFree(nxtAddr, strBuf.readInt()); + int bloblen = strBuf.readInt(); + assert bloblen > 0; // a Blob address MUST have a size + + immediateFree(nxtAddr, bloblen); } else { immediateFree(nxtAddr, 0); // size ignored for FreeAllocators } + + nxtAddr = strBuf.readInt(); } m_lastDeferredReleaseTime = lastReleaseTime; } catch (IOException e) { @@ -2840,11 +2785,6 @@ } } - private JournalTransactionService m_transactionService = null; - public void setTransactionService(final JournalTransactionService transactionService) { - this.m_transactionService = transactionService; - } - /** * The ContextAllocation object manages a freeList of associated allocators * and an overall list of allocators. When the context is detached, all Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/AbstractInterruptsTestCase.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -314,7 +314,7 @@ } else if (store instanceof RWStrategy) { RWStrategy rws = (RWStrategy)store; - rws.commit(); + rws.commit(null); } try { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestWORMStrategy.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -240,7 +240,7 @@ properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - return new Journal(properties).getBufferStrategy(); + return new Journal(properties); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-01 12:32:36 UTC (rev 3851) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-01 12:52:00 UTC (rev 3852) @@ -849,8 +849,10 @@ // since deferred frees, we must commit in order to ensure the // address in invalid, indicating it is available for - bs.commit(); + store.commit(); + rw.checkDeferredFrees(true, store); + try { rdBuf = bs.read(faddr); // should fail with illegal state throw new RuntimeException("Fail"); @@ -1147,7 +1149,8 @@ properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - return new Journal(properties).getBufferStrategy(); + // return new Journal(properties).getBufferStrategy(); + return new Journal(properties); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |