[Bigdata-commit] SF.net SVN: bigdata:[3398]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <mar...@us...> - 2010-08-03 10:26:47
|
Revision: 3398 http://bigdata.svn.sourceforge.net/bigdata/?rev=3398&view=rev Author: martyncutcher Date: 2010-08-03 10:26:41 +0000 (Tue, 03 Aug 2010) Log Message: ----------- Fix addressing errors for deferedFree list and temporarly backoff call of deferFree from RWStrategy to avoid build timeout until we can solve the problem Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-08-03 00:14:08 UTC (rev 3397) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-08-03 10:26:41 UTC (rev 3398) @@ -1635,11 +1635,10 @@ final long offset = entry.getKey(); // offset in file to update - registerWriteStatus(offset, md.recordLength, 'W'); - nwrites += FileChannelUtility.writeAll(opener, view, offset); // if (log.isInfoEnabled()) // log.info("writing to: " + offset); + registerWriteStatus(offset, md.recordLength, 'W'); } final WriteCacheCounters counters = this.counters.get(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-08-03 00:14:08 UTC (rev 3397) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-08-03 10:26:41 UTC (rev 3398) @@ -307,15 +307,16 @@ final int rwaddr = decodeAddr(addr); final int sze = decodeSize(addr); - if (service == null) { + // FIXME: need to decide on correct way to handle transaction oriented + // allocations + if (true || service == null) { m_store.free(rwaddr, sze); } else { /* * May well be better to always defer and then free in batch, * but for now need to confirm transaction logic */ - m_store.deferFree(rwaddr, sze, service - .getLastCommitTime()); + m_store.deferFree(rwaddr, sze, m_rb.getLastCommitTime()); } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-08-03 00:14:08 UTC (rev 3397) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-08-03 10:26:41 UTC (rev 3398) @@ -968,7 +968,7 @@ String cacheDebugInfo = m_writeCache.addrDebugInfo(paddr); log.warn("Invalid data checksum for addr: " + paddr + ", chk: " + chk + ", tstchk: " + tstchk + ", length: " + length - + ", first byte: " + buf[0] + ", successful reads: " + m_diskReads + + ", first bytes: " + toHexString(buf, 32) + ", successful reads: " + m_diskReads + ", at last extend: " + m_readsAtExtend + ", cacheReads: " + m_cacheReads + ", writeCacheDebug: " + cacheDebugInfo); @@ -987,6 +987,25 @@ } } + static final char[] HEX_CHAR_TABLE = { + '0', '1','2','3', + '4','5','6','7', + '8','9','a','b', + 'c','d','e','f' + }; + + // utility to display byte array of maximum i bytes as hexString + private String toHexString(byte[] buf, int n) { + n = n < buf.length ? n : buf.length; + StringBuffer out = new StringBuffer(); + for (int i = 0; i < n; i++) { + int v = buf[i] & 0xFF; + out.append(HEX_CHAR_TABLE[v >>> 4]); + out.append(HEX_CHAR_TABLE[v &0xF]); + } + return out.toString(); + } + /** * FIXME: This method is not currently used with BigData, if needed then * the address mangling needs re-working @@ -1492,14 +1511,6 @@ bb.flip(); m_deferredFreeListAddr = (int) alloc(deferBuf, addrSize); m_deferredFreeListEntries = addrs; - final int chk = ChecksumUtility.getCHK().checksum(deferBuf); - try { - m_writeCache.write(physicalAddress(m_deferredFreeListAddr), bb, chk); - } catch (IllegalStateException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } free(oldDeferredFreeListAddr, 0); } else { m_deferredFreeListAddr = 0; @@ -2658,10 +2669,14 @@ m_currentTxnFreeList.clear(); long rwaddr = alloc(buf, buf.length); + long paddr = physicalAddress((int) rwaddr); rwaddr <<= 32; - rwaddr += addrCount; + rwaddr += buf.length; + if (log.isTraceEnabled()) + log.trace("saveDeferrals: " + paddr + ", size: " + buf.length); + // Now add the reference of this block m_deferredFreeList.add(m_lastTxReleaseTime); m_deferredFreeList.add(rwaddr); @@ -2675,9 +2690,12 @@ * @param blockAddr */ protected void freeDeferrals(long blockAddr) { - int addr = (int) -(blockAddr >> 32); + int addr = (int) (blockAddr >> 32); int sze = (int) blockAddr & 0xFFFFFF; + if (log.isTraceEnabled()) + log.trace("freeDeferrals at " + physicalAddress(addr) + ", size: " + sze); + byte[] buf = new byte[sze+4]; // allow for checksum getData(addr, buf); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3477]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <mar...@us...> - 2010-09-01 14:19:14
|
Revision: 3477 http://bigdata.svn.sourceforge.net/bigdata/?rev=3477&view=rev Author: martyncutcher Date: 2010-09-01 14:19:05 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Commit inital implementaitons of shadow allocations and CommitRecord-based deleteBlocks Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -862,12 +862,13 @@ // Clear the old key. data.childAddr[i] = NULL; - if(btree.storeCache!=null) { + if (btree.storeCache!=null) { // remove from cache. btree.storeCache.remove(oldChildAddr); } // free the oldChildAddr if the Strategy supports it - btree.store.delete(oldChildAddr); + if (true) btree.store.delete(oldChildAddr); + // System.out.println("Deleting " + oldChildAddr); // Stash reference to the new child. // childRefs[i] = btree.newRef(newChild); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -1436,7 +1436,7 @@ final WriteCache cache = acquireForWriter(); try { - debugAddrs(offset, 0, 'A'); + debugAddrs(offset, data.remaining(), 'A'); // write on the cache. if (cache.write(offset, data, chk, useChecksum)) { @@ -1982,7 +1982,7 @@ } if (addrsUsed[i] == paddr) { ret.append(addrActions[i]); - if (addrActions[i]=='W') { + if (addrActions[i]=='A') { ret.append("[" + addrLens[i] + "]"); } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -677,4 +677,8 @@ public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager) { // NOP } + + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + // NOP + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -37,6 +37,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.FileChannel; +import java.util.Iterator; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; @@ -222,6 +223,18 @@ public static transient final int ROOT_NAME2ADDR = 0; /** + * The index of the address where the root block copy from the previous + * commit is stored + */ + public static transient final int PREV_ROOTBLOCK = 1; + + /** + * The index of the address of the delete blocks associated with + * this transaction + */ + public static transient final int DELETEBLOCK = 2; + + /** * A clone of the properties used to initialize the {@link Journal}. */ final protected Properties properties; @@ -520,6 +533,73 @@ } /** + * Return the root block view associated with the commitRecord for the + * provided commit time. This requires accessing the next commit record + * since the previous root block is stored with each record. + * + * @param commitTime + * A commit time. + * + * @return The root block view -or- <code>null</code> if there is no commit + * record for that commitTime. + * + */ + public IRootBlockView getRootBlock(final long commitTime) { + + final ICommitRecord commitRecord = getCommitRecordIndex().findNext(commitTime); + + if (commitRecord == null) { + return null; + } + + final long rootBlockAddr = commitRecord.getRootAddr(PREV_ROOTBLOCK); + + if (rootBlockAddr == 0) { + return null; + } else { + ByteBuffer bb = read(rootBlockAddr); + + return new RootBlockView(true /* rb0 - WTH */, bb, checker); + } + + } + + /** + * + * @param startTime from which to begin iteration + * + * @return an iterator over the committed root blocks + */ + public Iterator<IRootBlockView> getRootBlocks(final long startTime) { + return new Iterator<IRootBlockView>() { + ICommitRecord commitRecord = getCommitRecordIndex().findNext(startTime); + + public boolean hasNext() { + return commitRecord != null; + } + + public IRootBlockView next() { + final long rootBlockAddr = commitRecord.getRootAddr(PREV_ROOTBLOCK); + + commitRecord = getCommitRecordIndex().findNext(commitRecord.getTimestamp()); + + if (rootBlockAddr == 0) { + return null; + } else { + ByteBuffer bb = read(rootBlockAddr); + + return new RootBlockView(true /* rb0 - WTH */, bb, checker); + } + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + /** * True iff the journal was opened in a read-only mode. */ private final boolean readOnly; @@ -905,6 +985,8 @@ _bufferStrategy = new RWStrategy(fileMetadata, quorum); this._rootBlock = fileMetadata.rootBlock; + + setCommitter(DELETEBLOCK, new DeleteBlockCommitter((RWStrategy) _bufferStrategy)); break; @@ -961,6 +1043,8 @@ // report event. ResourceManager.openJournal(getFile() == null ? null : getFile().toString(), size(), getBufferStrategy() .getBufferMode()); + + this._bufferStrategy.setCommitRecordIndex(_commitRecordIndex); } finally { @@ -2017,6 +2101,8 @@ // clear reference and reload from the store. _commitRecordIndex = _getCommitRecordIndex(); + + _bufferStrategy.setCommitRecordIndex(_commitRecordIndex); // clear the array of committers. _committers = new ICommitter[_committers.length]; @@ -2509,7 +2595,7 @@ public long write(final ByteBuffer data) { - assertCanRead(); + assertCanWrite(); return _bufferStrategy.write(data); @@ -2524,7 +2610,15 @@ } - // Note: NOP for WORM. Used by RW for eventual recycle protocol. + public long write(ByteBuffer data, final long oldAddr, IAllocationContext context) { + return _bufferStrategy.write(data, oldAddr, context); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return _bufferStrategy.write(data, context); + } + + // Note: NOP for WORM. Used by RW for eventual recycle protocol. public void delete(final long addr) { assertCanWrite(); @@ -2533,6 +2627,18 @@ } + public void delete(final long addr, IAllocationContext context) { + + assertCanWrite(); + + _bufferStrategy.delete(addr, context); + + } + + public void detachContext(IAllocationContext context) { + _bufferStrategy.detachContext(context); + } + final public long getRootAddr(final int index) { final ReadLock lock = _fieldReadWriteLock.readLock(); @@ -2660,6 +2766,11 @@ */ setupName2AddrBTree(getRootAddr(ROOT_NAME2ADDR)); + + /** + * Register committer to write previous root block + */ + setCommitter(PREV_ROOTBLOCK, new RootBlockCommitter(this)); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -34,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -2483,10 +2484,33 @@ } public void delete(long addr) { - // void - + delegate.delete(addr); } + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + public void delete(long addr, IAllocationContext context) { + delegate.delete(addr, context); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return delegate.write(data, context); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + return delegate.write(data, oldAddr, context); + } + + public void detachContext(IAllocationContext context) { + delegate.detachContext(context); + } + } /** @@ -2861,6 +2885,30 @@ } + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + public void delete(long addr, IAllocationContext context) { + throw new UnsupportedOperationException(); + } + + public long write(ByteBuffer data, IAllocationContext context) { + throw new UnsupportedOperationException(); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + throw new UnsupportedOperationException(); + } + + public void detachContext(IAllocationContext context) { + delegate.detachContext(context); + } + } /** Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -26,6 +26,8 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.UUID; import com.bigdata.btree.BTree; @@ -696,4 +698,34 @@ } // CommitRecordIndexTupleSerializer + public Iterator<ICommitRecord> getCommitRecords(final long fromTime, final long toTime) { + return new Iterator<ICommitRecord>() { + ICommitRecord m_next = findNext(fromTime); + + public boolean hasNext() { + return m_next != null; + } + + public ICommitRecord next() { + if (m_next == null) { + throw new NoSuchElementException(); + } + + ICommitRecord ret = m_next; + m_next = findNext(ret.getTimestamp()); + + if (m_next != null && m_next.getTimestamp() > toTime) { + m_next = null; + } + + return ret; + } + + public void remove() { + throw new RuntimeException("Invalid Operation"); + } + + }; + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -2559,5 +2559,9 @@ public void setNextOffset(long lastOffset) { // void for standard Disk strategy } + + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + // NOP + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -1245,8 +1245,8 @@ } - if (log.isDebugEnabled()) - log.debug("Writing ROOTBLOCK with commitCounter: " + rootBlock.getCommitCounter() + if (log.isTraceEnabled()) + log.trace("Writing ROOTBLOCK with commitCounter: " + rootBlock.getCommitCounter() + ", commitRecordIndexAddr: " + rootBlock.getCommitRecordIndexAddr() + ", commitRecordAddr: " + rootBlock.getCommitRecordAddr()); } Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -0,0 +1,41 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.journal; + +/** + * An IAllocationContext defines a shadow environment which may be + * associated with allocations made during a transaction. + * + * @author Martyn Cutcher + * + */ +public interface IAllocationContext extends Comparable { + + /** + * @return the minimum release time for any freed allocations + */ + long minimumReleaseTime(); + +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -27,6 +27,8 @@ package com.bigdata.journal; +import java.util.Iterator; + import com.bigdata.rawstore.IRawStore; /** @@ -111,4 +113,24 @@ */ public ICommitRecord getCommitRecord(long timestamp); + /** + * Return the root block view associated with the commitRecord for the + * provided commit time. This requires accessing the next commit record + * since it is the previous root block that is referenced from each record. + * + * @param commitTime + * A commit time. + * + * @return The root block view -or- <code>null</code> if there is no commit + * record for that commitTime. + */ + public IRootBlockView getRootBlock(final long commitTime); + + /** + * + * @param startTime from which to begin iteration + * + * @return an iterator over the committed root blocks + */ + public Iterator<IRootBlockView> getRootBlocks(final long startTime); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -270,4 +270,17 @@ */ public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager); + + /** + * Needed to enable transaction support for standalone buffer strategies. + * + * The WORMStrategy does not need this since no data is ever deleted, but + * the RWStrategy must manage deletions and needs access to the historical + * commitRecords which reference the blocks of deferred deleted addresses. + * + * @param commitRecordIndex + * The CommitRecordIndex for the owning Journal + */ + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex); + } Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -0,0 +1,245 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.journal; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import com.bigdata.bfs.BigdataFileSystem; +import com.bigdata.btree.BTree; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.IndexMetadata; +import com.bigdata.counters.CounterSet; +import com.bigdata.mdi.IResourceMetadata; +import com.bigdata.relation.locator.IResourceLocator; +import com.bigdata.sparse.SparseRowStore; + +public class JournalDelegate implements IJournal { + final IJournal delegate; + + public JournalDelegate(final IJournal source) { + this.delegate = source; + } + + public Properties getProperties() { + return delegate.getProperties(); + } + + public void shutdown() { + delegate.shutdown(); + } + + public void shutdownNow() { + delegate.shutdownNow(); + } + + public void abort() { + delegate.abort(); + } + + public long commit() { + return delegate.commit(); + } + + public ICommitRecord getCommitRecord(long timestamp) { + return delegate.getCommitRecord(timestamp); + } + + public long getRootAddr(int index) { + return delegate.getRootAddr(index); + } + + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + + public IRootBlockView getRootBlockView() { + return delegate.getRootBlockView(); + } + + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + public void setCommitter(int index, ICommitter committer) { + delegate.setCommitter(index, committer); + } + + public void close() { + delegate.close(); + } + + public void delete(long addr) { + delegate.delete(addr); + } + + public void deleteResources() { + delegate.deleteResources(); + } + + public void destroy() { + delegate.destroy(); + } + + public void force(boolean metadata) { + delegate.force(metadata); + } + + public CounterSet getCounters() { + return delegate.getCounters(); + } + + public File getFile() { + return delegate.getFile(); + } + + public IResourceMetadata getResourceMetadata() { + return delegate.getResourceMetadata(); + } + + public UUID getUUID() { + return delegate.getUUID(); + } + + public boolean isFullyBuffered() { + return delegate.isFullyBuffered(); + } + + public boolean isOpen() { + return delegate.isOpen(); + } + + public boolean isReadOnly() { + return delegate.isOpen(); + } + + public boolean isStable() { + return delegate.isStable(); + } + + public ByteBuffer read(long addr) { + return delegate.read(addr); + } + + public long size() { + return delegate.size(); + } + + public long write(ByteBuffer data) { + return delegate.write(data); + } + + public long write(ByteBuffer data, long oldAddr) { + return delegate.write(data, oldAddr); + } + + public int getByteCount(long addr) { + return delegate.getByteCount(addr); + } + + public long getOffset(long addr) { + return delegate.getOffset(addr); + } + + public long toAddr(int nbytes, long offset) { + return delegate.toAddr(nbytes, offset); + } + + public String toString(long addr) { + return delegate.toString(addr); + } + + public IIndex getIndex(String name) { + return delegate.getIndex(name); + } + + public IIndex registerIndex(String name, BTree btree) { + return delegate.registerIndex(name, btree); + } + + public IIndex registerIndex(String name, IndexMetadata indexMetadata) { + return delegate.registerIndex(name, indexMetadata); + } + + public void dropIndex(String name) { + delegate.dropIndex(name); + } + + public void registerIndex(IndexMetadata indexMetadata) { + delegate.registerIndex(indexMetadata); + } + + public ExecutorService getExecutorService() { + return delegate.getExecutorService(); + } + + public BigdataFileSystem getGlobalFileSystem() { + return delegate.getGlobalFileSystem(); + } + + public SparseRowStore getGlobalRowStore() { + return delegate.getGlobalRowStore(); + } + + public IIndex getIndex(String name, long timestamp) { + return delegate.getIndex(name, timestamp); + } + + public long getLastCommitTime() { + return delegate.getLastCommitTime(); + } + + public IResourceLocator getResourceLocator() { + return delegate.getResourceLocator(); + } + + public IResourceLockService getResourceLockService() { + return delegate.getResourceLockService(); + } + + public TemporaryStore getTempStore() { + return delegate.getTempStore(); + } + + public void delete(long addr, IAllocationContext context) { + delegate.delete(addr, context); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return delegate.write(data, context); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + return delegate.write(data, oldAddr, context); + } + + public void detachContext(IAllocationContext context) { + delegate.detachContext(context); + } +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -0,0 +1,85 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.journal; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A JournalShadow wraps a Journal as a JournalDelegate but provides itself + * as the allocation context to be passed through to any interested + * BufferStrategy. + * + * This is the path by which RWStore allocators are provided the context for + * the allocations and deletes made + * + * @author Martyn Cutcher + * + */ +public class JournalShadow extends JournalDelegate implements IAllocationContext { + static AtomicLong s_idCounter = new AtomicLong(23); + int m_id = (int) s_idCounter.incrementAndGet(); + + public JournalShadow(IJournal source) { + super(source); + } + + public long write(ByteBuffer data) { + return delegate.write(data, this); + } + + public long write(ByteBuffer data, long oldAddr) { + return delegate.write(data, oldAddr, this); + } + + public void delete(long oldAddr) { + delegate.delete(oldAddr, this); + } + + public int compareTo(Object o) { + if (o instanceof JournalShadow) { + JournalShadow js = (JournalShadow) o; + return m_id - js.m_id; + } else { + return -1; + } + } + + /** + * TODO: should retrieve from localTransactionService or Journal + * properties + */ + public long minimumReleaseTime() { + return 0; + } + + /** + * Release itself from the wrapped Journal, this unlocks the allocator for + * the RWStore + */ + public void detach() { + delegate.detachContext(this); + } +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import com.bigdata.service.AbstractFederation; import com.bigdata.service.AbstractTransactionService; @@ -514,4 +515,25 @@ } } + /** + * Invoke a method with the {@link AbstractTransactionService}'s lock held. + * + * But throw immediate exception if try fails. + * + * @param <T> + * @param callable + * @return + * @throws Exception + */ + public <T> T tryCallWithLock(final Callable<T> callable, long waitFor, TimeUnit unit) throws Exception { + if (!lock.tryLock(waitFor,unit)) { + throw new RuntimeException("Lock not available"); + } + try { + return callable.call(); + } finally { + lock.unlock(); + } + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -81,6 +81,8 @@ final private FileMetadataView m_fmv = new FileMetadataView(); + private volatile boolean m_open = false; + private volatile IRootBlockView m_rb; private volatile IRootBlockView m_rb0; private volatile IRootBlockView m_rb1; @@ -110,6 +112,7 @@ m_rb = fileMetadata.rootBlock; m_store = new RWStore(m_fmv, false, quorum); // not read-only for now + m_open = true; m_rb0 = copyRootBlock(true); m_rb1 = copyRootBlock(false); @@ -233,6 +236,10 @@ } public long write(ByteBuffer data) { + return write(data, null); + } + + public long write(ByteBuffer data, IAllocationContext context) { checkReopen(); if (data == null) { @@ -246,7 +253,7 @@ } try { - long rwaddr = m_store.alloc(data.array(), nbytes); + long rwaddr = m_store.alloc(data.array(), nbytes, context); data.position(nbytes); // update position to end of buffer long retaddr = encodeAddr(rwaddr, nbytes); @@ -296,30 +303,27 @@ return (int) (addr & 0xFFFFFFFF); } + public void delete(long addr) { + if (true) delete(addr, null); + } + /** * Must check whether there are existing transactions which may access * this data, and if not free immediately, otherwise defer. */ - public void delete(long addr) { - final JournalTransactionService service = (JournalTransactionService) (localTransactionManager == null ? null - : localTransactionManager.getTransactionService()); + public void delete(long addr, IAllocationContext context) { final int rwaddr = decodeAddr(addr); final int sze = decodeSize(addr); - // FIXME: need to decide on correct way to handle transaction oriented - // allocations - if (true || service == null) { - m_store.free(rwaddr, sze); - } else { - /* - * May well be better to always defer and then free in batch, - * but for now need to confirm transaction logic - */ - m_store.deferFree(rwaddr, sze, m_rb.getLastCommitTime()); - } + m_store.free(rwaddr, sze, context); } + + public void detachContext(IAllocationContext context) { + m_store.detachContext(context); + } + public static class RWAddressManager implements IAddressManager { public int getByteCount(long addr) { @@ -428,8 +432,8 @@ try { m_store.checkRootBlock(rootBlock); - if (log.isInfoEnabled()) { - log.info("Writing new rootblock with commitCounter: " + if (log.isTraceEnabled()) { + log.trace("Writing new rootblock with commitCounter: " + rootBlock.getCommitCounter() + ", commitRecordAddr: " + rootBlock.getCommitRecordAddr() + ", commitRecordIndexAddr: " + rootBlock.getCommitRecordIndexAddr()); @@ -461,6 +465,8 @@ throw new IllegalStateException(); } try { + m_open = false; + m_store.close(); m_fileMetadata.raf.close(); m_fileMetadata.raf = null; @@ -563,6 +569,7 @@ m_fileMetadata.raf = new RandomAccessFile(m_fileMetadata.file, m_fileMetadata.fileMode); m_store = new RWStore(m_fmv, false, m_environment); // never read-only for now m_needsReopen = false; + m_open = true; } catch (Throwable t) { t.printStackTrace(); @@ -593,7 +600,8 @@ } public boolean isOpen() { - return m_fileMetadata.raf != null && m_fileMetadata.raf.getChannel().isOpen(); + // return m_fileMetadata.raf != null && m_fileMetadata.raf.getChannel().isOpen(); + return m_open; } public boolean isReadOnly() { @@ -714,4 +722,25 @@ m_store.setTransactionService((JournalTransactionService) localTransactionManager.getTransactionService()); } + public long getPhysicalAddress(long addr) { + int rwaddr = decodeAddr(addr); + + return m_store.physicalAddress(rwaddr); + } + + /** + * Saves the current list of delete blocks, returning the address allocated. + * This can be used later to retrieve the addresses of allocations to be + * freed. + * + * @return the address of the delete blocks, or zero if none + */ + public long saveDeleteBlocks() { + return m_store.saveDeferrals(); + } + + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + m_store.setCommitRecordIndex(commitRecordIndex); + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -2240,4 +2240,8 @@ } + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + // NOP + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import com.bigdata.LRUNexus; +import com.bigdata.journal.IAllocationContext; /** * Abstract base class for {@link IRawStore} implementations. This class uses a @@ -76,4 +77,23 @@ public void delete(long addr) { // NOP. } + + public void delete(long addr, IAllocationContext context) { + delete(addr); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return write(data); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + return write(data, oldAddr); + } + + /** + * The default implementation is a NOP. + */ + public void detachContext(IAllocationContext context) { + // NOP + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -37,6 +37,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.io.IByteArrayBuffer; import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.IAllocationContext; import com.bigdata.mdi.IResourceMetadata; /** @@ -124,6 +125,7 @@ public long write(ByteBuffer data); /** + * Write the data (unisolated). * * @param data * The data. The bytes from the current @@ -133,6 +135,43 @@ * {@link ByteBuffer#limit()} . The caller may subsequently * modify the contents of the buffer without changing the state * of the store (i.e., the data are copied into the store). + * + * @param context defines teh shadow AllocationContext from which this call + * was made + * + * @return A long integer formed that encodes both the offset from which the + * data may be read and the #of bytes to be read. See + * {@link IAddressManager}. + * + * @throws IllegalArgumentException + * if <i>data</i> is <code>null</code>. + * @throws IllegalArgumentException + * if <i>data</i> has zero bytes {@link ByteBuffer#remaining()}. + * @throws IllegalStateException + * if the store is not open. + * @throws IllegalStateException + * if the store does not allow writes. + * + * @todo define exception if the maximum extent would be exceeded. + * + * @todo the addresses need to reflect the ascending offset at which the + * data are written, at least for a class of append only store. some + * stores, such as the Journal, also have an offset from the start of + * the file to the start of the data region (in the case of the + * Journal it is used to hold the root blocks). + */ + public long write(ByteBuffer data, IAllocationContext context); + + /** + * + * @param data + * The data. The bytes from the current + * {@link ByteBuffer#position()} to the + * {@link ByteBuffer#limit()} will be written and the + * {@link ByteBuffer#position()} will be advanced to the + * {@link ByteBuffer#limit()} . The caller may subsequently + * modify the contents of the buffer without changing the state + * of the store (i.e., the data are copied into the store). * @param oldAddr as returned from a previous write of the same object, or zero if a new write * * @return A long integer formed that encodes both the offset from which the @@ -141,6 +180,25 @@ */ public long write(ByteBuffer data, long oldAddr); + /** + * + * @param data + * The data. The bytes from the current + * {@link ByteBuffer#position()} to the + * {@link ByteBuffer#limit()} will be written and the + * {@link ByteBuffer#position()} will be advanced to the + * {@link ByteBuffer#limit()} . The caller may subsequently + * modify the contents of the buffer without changing the state + * of the store (i.e., the data are copied into the store). + * @param oldAddr as returned from a previous write of the same object, or zero if a new write + * @param context defines the shadow AllocationContext from which this call is made + * + * @return A long integer formed that encodes both the offset from which the + * data may be read and the #of bytes to be read. See + * {@link IAddressManager}. + */ + public long write(ByteBuffer data, long oldAddr, IAllocationContext context); + /** * Delete the data (unisolated). * <p> @@ -168,6 +226,48 @@ public void delete(long addr); /** + * Delete the data (unisolated). + * <p> + * After this operation subsequent reads on the address MAY fail and the + * caller MUST NOT depend on the ability to read at that address. + * + * @param addr + * A long integer formed using {@link Addr} that encodes both the + * offset at which the data was written and the #of bytes that + * were written. + * + * @param context + * Defines the shadow AllocationContext from which this call is + * made. For RWStore this can be used to immediately free the + * allocation if it can be determined to have orignally have + * been requested from the same context. + * + * @exception IllegalArgumentException + * If the address is known to be invalid (never written or + * deleted). Note that the address 0L is always invalid. + * + * It is only applicable in the + * context of a garbage collection strategy. With an append only + * store and with eviction of btrees into index segments there + * is no reason to delete anything on the store - and nothing to + * keep track of the delete. + * + * However, with a Read-Write store it is a requirement, and a void + * implementation is provided for other stores. + */ + public void delete(long addr, IAllocationContext context); + + /** + * + * @param context + * Defines the shadow AllocationContext that may have been used + * to allocate or delete storage. The RWStore assigns + * Allocation areas to specific contexts and these must be + * released for use by others. + */ + public void detachContext(IAllocationContext context); + + /** * Read the data (unisolated). * * @param addr Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -43,8 +43,10 @@ import com.bigdata.journal.ConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.IIndexStore; +import com.bigdata.journal.IJournal; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; +import com.bigdata.journal.JournalShadow; import com.bigdata.journal.TimestampUtility; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.accesspath.ChunkConsumerIterator; @@ -503,6 +505,11 @@ * the mutation task will read. */ tx = jnl.newTx(lastCommitTime); + + /* + * Create the shadow journal to define the allocation context + */ + indexManager = new JournalShadow(jnl); // the timestamp that we will read on for this step. joinNexusFactory.setReadTimestamp(TimestampUtility Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -20,13 +20,14 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ + */ package com.bigdata.rwstore; import java.util.ArrayList; import com.bigdata.io.writecache.WriteCacheService; +import com.bigdata.journal.IAllocationContext; /** * Bit maps for an allocator. The allocator is a bit map managed as int[]s. @@ -58,6 +59,11 @@ */ int m_commit[]; /** + * If used as a shadow allocator, then the _commit is saved to m_saveCommit + * and m_transients is copied to m_commit. + */ + int m_saveCommit[]; + /** * Just the newly allocated bits. This will be copied onto {@link #m_commit} * when the current native transaction commits. */ @@ -73,113 +79,152 @@ */ private final RWWriteCacheService m_writeCache; - AllocBlock(final int addrIsUnused, final int bitSize, final RWWriteCacheService cache) { - m_writeCache = cache; - m_ints = bitSize; - m_commit = new int[bitSize]; - m_bits = new int[bitSize]; - m_transients = new int[bitSize]; - } + AllocBlock(final int addrIsUnused, final int bitSize, final RWWriteCacheService cache) { + m_writeCache = cache; + m_ints = bitSize; + m_commit = new int[bitSize]; + m_bits = new int[bitSize]; + m_transients = new int[bitSize]; + } - public boolean verify(final int addr, final int size) { - if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { - return false; - } + public boolean verify(final int addr, final int size) { + if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { + return false; + } // Now check to see if it allocated - final int bit = (addr - m_addr) / size; + final int bit = (addr - m_addr) / size; - return RWStore.tstBit(m_bits, bit); - } + return RWStore.tstBit(m_bits, bit); + } - public boolean addressInRange(final int addr, final int size) { - return (addr >= m_addr && addr <= (m_addr + (size * 32 * m_ints))); - } - - public boolean free(final int addr, final int size) { - if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { - return false; - } + public boolean addressInRange(final int addr, final int size) { + return (addr >= m_addr && addr <= (m_addr + (size * 32 * m_ints))); + } - freeBit((addr - m_addr) / size, addr); + public boolean free(final int addr, final int size) { + if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { + return false; + } - return true; - } + freeBit((addr - m_addr) / size); - public boolean freeBit(final int bit, final long addr) { - // Allocation optimization - if bit NOT set in committed memory then clear - // the transient bit to permit reallocation within this transaction. - // - // Note that with buffered IO there is also an opportunity to avoid output to - // the file by removing any pending write to the now freed address. On large - // transaction scopes this may be significant. - RWStore.clrBit(m_bits, bit); - - if (!RWStore.tstBit(m_commit, bit)) { - // Should not be cleared here! - // m_writeCache.clearWrite(addr); + return true; + } - RWStore.clrBit(m_transients, bit); - - return true; - } else { - return false; - } - } + public boolean freeBit(final int bit) { + if (!RWStore.tstBit(m_bits, bit)) { + throw new IllegalArgumentException("Freeing bit not set"); + } + + // Allocation optimization - if bit NOT set in committed memory then + // clear + // the transient bit to permit reallocation within this transaction. + // + // Note that with buffered IO there is also an opportunity to avoid + // output to + // the file by removing any pending write to the now freed address. On + // large + // transaction scopes this may be significant. + RWStore.clrBit(m_bits, bit); - public int alloc(final int size) { - if (size < 0) { - throw new Error("Storage allocation error : negative size passed"); - } + if (!RWStore.tstBit(m_commit, bit)) { + RWStore.clrBit(m_transients, bit); - final int bit = RWStore.fndBit(m_transients, m_ints); + return true; + } else { + return false; + } + } - if (bit != -1) { - RWStore.setBit(m_bits, bit); - RWStore.setBit(m_transients, bit); + /** + * The shadow, if non-null defines the context for this request. + * + * If an existing shadow is registered, then the allocation fails + * immediately. + * + * If no existing shadow is registered, and a new allocation can be made + * then this AllocBlock is registered with the shadow. + * + * Note that when shadows are used, an allocator on a free list may not have + * allocations available for all contexts, so the assumption that presence + * on the free list implies availability is not assertable. + */ - return bit; - } else { - return -1; - } - } + public int alloc(final int size) { + if (size < 0) { + throw new Error("Storage allocation error : negative size passed"); + } - public boolean hasFree() { - for (int i = 0; i < m_ints; i++) { - if (m_bits[i] != 0xFFFFFFFF) { - return true; - } - } + final int bit = RWStore.fndBit(m_transients, m_ints); - return false; - } + if (bit != -1) { + RWStore.setBit(m_bits, bit); + RWStore.setBit(m_transients, bit); + return bit; + } else { + return -1; + } + } + + public boolean hasFree() { + for (int i = 0; i < m_ints; i++) { + if (m_bits[i] != 0xFFFFFFFF) { + return true; + } + } + + return false; + } + public int getAllocBits() { - int total = m_ints * 32; - int allocBits = 0; - for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { - allocBits++; - } - } - - return allocBits; + int total = m_ints * 32; + int allocBits = 0; + for (int i = 0; i < total; i++) { + if (RWStore.tstBit(m_bits, i)) { + allocBits++; + } + } + + return allocBits; } - public String getStats() { - final int total = m_ints * 32; - final int allocBits = getAllocBits(); + public String getStats() { + final int total = m_ints * 32; + final int allocBits = getAllocBits(); - return "Addr : " + m_addr + " [" + allocBits + "::" + total + "]"; - } + return "Addr : " + m_addr + " [" + allocBits + "::" + total + "]"; + } - public void addAddresses(final ArrayList addrs, final int rootAddr) { - final int total = m_ints * 32; - - for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { - addrs.add(new Integer(rootAddr - i)); - } - } - } + public void addAddresses(final ArrayList addrs, final int rootAddr) { + final int total = m_ints * 32; + + for (int i = 0; i < total; i++) { + if (RWStore.tstBit(m_bits, i)) { + addrs.add(new Integer(rootAddr - i)); + } + } + } + + /** + * Store m_commit bits in m_saveCommit then duplicate transients to m_commit. + * + * This ensures, that while shadowed, the allocator will not re-use storage + * that was allocated prior to the shadow creation. + */ + public void shadow() { + m_saveCommit = m_commit; + m_commit = m_transients.clone(); + } + + /** + * The transient bits will have been added to correctly, we now just need to + * restore the commit bits from the m_saveCommit, to allow re-allocation + * of non-committed storage. + */ + public void deshadow() { + m_commit = m_saveCommit; + m_saveCommit = null; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; +import com.bigdata.journal.IAllocationContext; + public interface Allocator extends Comparable { public int getBlockSize(); public void setIndex(int index); @@ -35,7 +37,7 @@ public long getStartAddr(); public boolean addressInRange(int addr); public boolean free(int addr, int size); - public int alloc(RWStore store, int size); + public int alloc(RWStore store, int size, IAllocationContext context); public int getDiskAddr(); public void setDiskAddr(int addr); public long getPhysicalAddress(int offset); @@ -46,10 +48,10 @@ public boolean hasFree(); public void setFreeList(ArrayList list); public String getStats(AtomicLong counter); - public void preserveSessionData(); public void addAddresses(ArrayList addrs); public int getRawStartAddr(); public int getIndex(); public void appendShortStats(StringBuffer str); + public boolean canImmediatelyFree(int addr, int size, IAllocationContext context); } \ No newline at end of file Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; +import com.bigdata.journal.IAllocationContext; import com.bigdata.util.ChecksumUtility; /** @@ -35,6 +36,8 @@ public BlobAllocator(RWStore store, int sortAddr) { m_store = store; m_sortAddr = sortAddr; + + System.out.println("New BlobAllocator"); } public void addAddresses(ArrayList addrs) { @@ -46,7 +49,7 @@ return false; } - public int alloc(RWStore store, int size) { + public int alloc(RWStore store, int size, IAllocationContext context) { assert size > m_store.m_maxFixedAlloc; return 0; @@ -92,7 +95,41 @@ return false; } + + public int getFirstFixedForBlob(int addr, int sze) { + if (sze < m_store.m_maxFixedAlloc) + throw new IllegalArgumentException("Unexpected address size"); + int alloc = m_store.m_maxFixedAlloc-4; + int blcks = (alloc - 1 + sze)/alloc; + + int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; + if (hdr_idx > m_hdrs.length) + throw new IllegalArgumentException("free BlobAllocation problem, hdr offset: " + hdr_idx + ", avail:" + m_hdrs.length); + + int hdr_addr = m_hdrs[hdr_idx]; + + if (hdr_addr == 0) { + throw new IllegalArgumentException("getFirstFixedForBlob called with unallocated address"); + } + + // read in header block, then free each reference + byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum + m_store.getData(hdr_addr, hdr); + + try { + DataInputStream instr = new DataInputStream( + new ByteArrayInputStream(hdr, 0, hdr.length-4) ); + int nallocs = instr.readInt(); + int faddr = instr.readInt(); + + return faddr; + + } catch (IOException ioe) { + throw new RuntimeException("Unable to retrieve first fixed address", ioe); + } + } + public int getBlockSize() { // Not relevant for Blobs return 0; @@ -269,4 +306,22 @@ return m_hdrs[offset] != 0; } + /** + * This is okay as a NOP. The true allocation is managed by the + * FixedAllocators. + */ + public void detachContext(IAllocationContext context) { + // NOP + } + + /** + * Since the real allocation is in the FixedAllocators, this should delegate + * to the first address, in which case + */ + public boolean canImmediatelyFree(int addr, int size, IAllocationContext context) { + int faddr = this.getFirstFixedForBlob(addr, size); + + return m_store.getBlockByAddress(faddr).canImmediatelyFree(faddr, 0, context); + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -30,6 +30,7 @@ import org.apache.log4j.Logger; +import com.bigdata.journal.IAllocationContext; import com.bigdata.util.ChecksumUtility; /** @@ -47,8 +48,6 @@ private int m_diskAddr; int m_index; - protected boolean m_preserveSession = false; - public void setIndex(int index) { AllocBlock fb = (AllocBlock) m_allocBlocks.get(0); if (log.isDebugEnabled()) @@ -57,10 +56,6 @@ m_index = index; } - public void preserveSessionData() { - m_preserveSession = true; - } - public long getStartAddr() { return RWStore.convertAddr(m_startAddr); } @@ -125,9 +120,31 @@ } } + volatile private IAllocationContext m_context; + public void setAllocationContext(IAllocationContext context) { + if (context == null && m_context != null) { + // restore commit bits in AllocBlocks + for (AllocBlock allocBlock : m_allocBlocks) { + allocBlock.deshadow(); + } + } else if (context != null & m_context == null) { + // restore commit bits in AllocBlocks + for (AllocBlock allocBlock : m_allocBlocks) { + allocBlock.shadow(); + } + } + m_context = context; + } + + /** + * write called on commit, so this is the point when "transient frees" - the + * freeing of previously committed memory can be made available since we + * are creating a new commit point - the condition being that m_freeBits + * was zero and m_freeTransients not. + */ public byte[] write() { try { - final AllocBlock fb = (AllocBlock) m_allocBlocks.get(0); + final AllocBlock fb = m_allocBlocks.get(0); if (log.isDebugEnabled()) log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_bits[0]); final byte[] buf = new byte[1024]; @@ -144,17 +161,29 @@ str.writeInt(block.m_bits[i]); } - if (!m_preserveSession) { - block.m_transients = (int[]) block.m_bits.clone(); + if (!m_store.isSessionPreserved()) { + block.m_transients = block.m_bits.clone(); } - block.m_commit = (int[]) block.m_bits.clone(); + /** + * If this allocator is shadowed then copy the new + * committed state to m_saveCommit + */ + if (m_context != null) { + assert block.m_saveCommit != null; + + block.m_saveCommit = block.m_bits.clone(); + } else if (m_store.isSessionPreserved()) { + block.m_commit = block.m_transients.clone(); + } else { + block.m_commit = block.m_bits.clone(); + } } // add checksum final int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); str.writeInt(chk); - if (!m_preserveSession) { + if (!m_store.isSessionPreserved()) { m_freeBits += m_freeTransients; // Handle re-addition to free list once transient frees are @@ -234,6 +263,8 @@ private final ArrayList<AllocBlock> m_allocBlocks; + private RWStore m_store; + /** * Calculating the number of ints (m_bitSize) cannot rely on a power of 2. Previously this * assumption was sufficient to guarantee a rounding on to an 64k boundary. However, now @@ -248,8 +279,9 @@ * @param preserveSessionData * @param cache */ - FixedAllocator(final int size, final boolean preserveSessionData, final RWWriteCacheService cache) { + FixedAllocator(final RWStore store, final int size, final RWWriteCacheService cache) { m_diskAddr = 0; + m_store = store; m_size = size; @@ -289,8 +321,6 @@ m_freeTransients = 0; m_freeBits = 32 * m_bitSize * numBlocks; - - m_preserveSession = preserveSessionData; } public String getStats(final AtomicLong counter) { @@ -351,7 +381,7 @@ final int block = offset/nbits; if (((AllocBlock) m_allocBlocks.get(block)) - .freeBit(offset % nbits, getPhysicalAddress(offset + 3))) { // bit adjust + .freeBit(offset % n... [truncated message content] |
[Bigdata-commit] SF.net SVN: bigdata:[3483]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-09-01 16:29:13
|
Revision: 3483 http://bigdata.svn.sourceforge.net/bigdata/?rev=3483&view=rev Author: thompsonbry Date: 2010-09-01 16:29:07 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Removed the use of the ShadowJournal from ProgramTask. It will be integrated into AbstractTask instead. Removed if(true) from Node. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java 2010-09-01 16:28:00 UTC (rev 3482) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java 2010-09-01 16:29:07 UTC (rev 3483) @@ -867,7 +867,7 @@ btree.storeCache.remove(oldChildAddr); } // free the oldChildAddr if the Strategy supports it - if (true) btree.store.delete(oldChildAddr); + btree.store.delete(oldChildAddr); // System.out.println("Deleting " + oldChildAddr); // Stash reference to the new child. Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 16:28:00 UTC (rev 3482) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 16:29:07 UTC (rev 3483) @@ -37,16 +37,13 @@ import org.apache.log4j.Logger; -import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.AbstractTask; import com.bigdata.journal.BufferMode; import com.bigdata.journal.ConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.IIndexStore; -import com.bigdata.journal.IJournal; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; -import com.bigdata.journal.JournalShadow; import com.bigdata.journal.TimestampUtility; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.accesspath.ChunkConsumerIterator; @@ -506,11 +503,6 @@ */ tx = jnl.newTx(lastCommitTime); - /* - * Create the shadow journal to define the allocation context - */ - indexManager = new JournalShadow(jnl); - // the timestamp that we will read on for this step. joinNexusFactory.setReadTimestamp(TimestampUtility .asHistoricalRead(lastCommitTime)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3491]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-09-01 20:03:04
|
Revision: 3491 http://bigdata.svn.sourceforge.net/bigdata/?rev=3491&view=rev Author: thompsonbry Date: 2010-09-01 20:02:58 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Added comments related to https://sourceforge.net/apps/trac/bigdata/ticket/151 and https://sourceforge.net/apps/trac/bigdata/ticket/152. Reconciled edits from Martyn in RWStrategy. Made freeImmediately() private. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IRootBlockView.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IRootBlockView.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IRootBlockView.java 2010-09-01 18:43:58 UTC (rev 3490) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IRootBlockView.java 2010-09-01 20:02:58 UTC (rev 3491) @@ -112,9 +112,17 @@ * The root block version number. */ public int getVersion(); - + /** * The next offset at which a data item would be written on the store. + * + * FIXME The RWStore has different semantics for this field. Document those + * semantics and modify {@link AbstractJournal} so we can directly decide + * how many bytes were "written" (for the WORM) or were "allocated" (for the + * RWStore, in which case it should probably be the net of the bytes + * allocated and released). Update all the locations in the code which rely + * on {@link #getNextOffset()} to compute the #of bytes written onto the + * store. */ public long getNextOffset(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-09-01 18:43:58 UTC (rev 3490) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-09-01 20:02:58 UTC (rev 3491) @@ -253,11 +253,12 @@ throw new IllegalArgumentException(); } - try { - long rwaddr = m_store.alloc(data.array(), nbytes, context); + try { /* FIXME [data] is not always backed by an array, the array may not be visible (read-only), the array offset may not be zero, etc. Try to drive the ByteBuffer into the RWStore.alloc() method instead. */ + if(data.hasArray()&&data.arrayOffset()!=0)throw new AssertionError(); + final long rwaddr = m_store.alloc(data.array(), nbytes, context); data.position(nbytes); // update position to end of buffer - long retaddr = encodeAddr(rwaddr, nbytes); + final long retaddr = encodeAddr(rwaddr, nbytes); return retaddr; } catch (RuntimeException re) { @@ -305,7 +306,7 @@ } public void delete(long addr) { - if (true) delete(addr, null); + delete(addr, null); } /** Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java 2010-09-01 18:43:58 UTC (rev 3490) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java 2010-09-01 20:02:58 UTC (rev 3491) @@ -38,7 +38,7 @@ public class RootBlockCommitter implements ICommitter { final AbstractJournal journal; - public RootBlockCommitter(AbstractJournal journal) { + public RootBlockCommitter(final AbstractJournal journal) { this.journal = journal; } @@ -46,9 +46,16 @@ * Write the current root block to the Journal and return its address * to be stored in the CommitRecord. */ - public long handleCommit(long commitTime) { - ByteBuffer rbv = journal.getRootBlockView().asReadOnlyBuffer(); - + public long handleCommit(final long commitTime) { + final ByteBuffer rbv = journal.getRootBlockView().asReadOnlyBuffer(); + /* + * FIXME There is an API issue with the RWStore which does not allow + * us to pass in a read-only buffer. Write unit tests for this on + * the core IRawStore test suite and fix the RWStore. Also write + * unit tests when the array backing the ByteBuffer can be accessed + * but has a non-zero array offset (a mutable slice of a ByteBuffer). + */ +// return journal.write(rbv); ByteBuffer bb = ByteBuffer.allocate(rbv.capacity()); for (int i = 0; i < rbv.capacity(); i++) { bb.put(rbv.get()); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-09-01 18:43:58 UTC (rev 3490) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-09-01 20:02:58 UTC (rev 3491) @@ -1084,6 +1084,7 @@ * @param sze */ public void free(final long laddr, final int sze, final IAllocationContext context) { +// if (true) return; final int addr = (int) laddr; switch (addr) { @@ -1118,7 +1119,7 @@ } - public void immediateFree(final int addr, final int sze) { + private void immediateFree(final int addr, final int sze) { switch (addr) { case 0: case -1: @@ -2621,7 +2622,7 @@ if (m_transactionService.getActiveCount() == 0) { return aged; - } else { + } else { return aged < earliest ? aged : earliest; } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3763]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <mar...@us...> - 2010-10-09 09:11:47
|
Revision: 3763 http://bigdata.svn.sourceforge.net/bigdata/?rev=3763&view=rev Author: martyncutcher Date: 2010-10-09 09:11:41 +0000 (Sat, 09 Oct 2010) Log Message: ----------- Call RootBlockCommitter directly rather than via registration and immediately free meta-rallocation blocks to support "null" commit. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-10-09 01:36:31 UTC (rev 3762) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-10-09 09:11:41 UTC (rev 3763) @@ -633,6 +633,8 @@ private final long initialExtent; private final long minimumExtension; + private RootBlockCommitter m_rootBlockCommitter; + /** * The maximum extent before a {@link #commit()} will {@link #overflow()}. * In practice, overflow tries to trigger before this point in order to @@ -2300,8 +2302,13 @@ return 0L; } - + /* + * Explicitly call the RootBlockCommitter + */ + rootAddrs[PREV_ROOTBLOCK] = this.m_rootBlockCommitter.handleCommit(commitTime); + + /* * Write the commit record onto the store. * * @todo Modify to log the current root block and set the address of @@ -2767,9 +2774,11 @@ setupName2AddrBTree(getRootAddr(ROOT_NAME2ADDR)); /** - * Register committer to write previous root block + * Do not register committer to write previous root block, but + * instead just create it and call explicitly when required. This + * is a workaround to allow "void" transactions. */ - setCommitter(PREV_ROOTBLOCK, new RootBlockCommitter(this)); + m_rootBlockCommitter = new RootBlockCommitter(this); /** * If the strategy is a RWStrategy, then register the delete Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java 2010-10-09 01:36:31 UTC (rev 3762) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockCommitter.java 2010-10-09 09:11:41 UTC (rev 3763) @@ -47,7 +47,9 @@ * to be stored in the CommitRecord. */ public long handleCommit(final long commitTime) { - final ByteBuffer rbv = journal.getRootBlockView().asReadOnlyBuffer(); + IRootBlockView view = journal.getRootBlockView(); + + final ByteBuffer rbv = view.asReadOnlyBuffer(); /* * FIXME There is an API issue with the RWStore which does not allow * us to pass in a read-only buffer. Write unit tests for this on Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-10-09 01:36:31 UTC (rev 3762) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-10-09 09:11:41 UTC (rev 3763) @@ -1584,7 +1584,9 @@ throw new IllegalStateException("Returned MetaBits Address not valid!"); } - free(oldMetaBits, oldMetaBitsSize); + // Call immediateFree - no need to defer freeof metaBits, this + // has to stop somewhere! + immediateFree((int) oldMetaBits, oldMetaBitsSize); // save allocation headers Iterator iter = m_commitList.iterator(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3858]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-01 19:08:20
|
Revision: 3858 http://bigdata.svn.sourceforge.net/bigdata/?rev=3858&view=rev Author: thompsonbry Date: 2010-11-01 18:51:08 +0000 (Mon, 01 Nov 2010) Log Message: ----------- Modified AbstractJournal and RWStore to not expose getCommitRecordIndex() since that returns the live (mutable) version of that index. Instead, added getReadOnlyCommitRecordIndex() and modified RWStore to use the rangeIterator to visit, fetch, and deserialize the ICommitRecords. Took out some implementatons of methods left over when removing those methods from IBufferStrategy. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-01 17:48:31 UTC (rev 3857) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-01 18:51:08 UTC (rev 3858) @@ -673,11 +673,4 @@ return false; } - public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager) { - // NOP - } - - public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { - // NOP - } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-01 17:48:31 UTC (rev 3857) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-01 18:51:08 UTC (rev 3858) @@ -2855,6 +2855,35 @@ } + /** + * Return a read-only view of the last committed state of the + * {@link CommitRecordIndex}. + * + * @return The read-only view of the {@link CommitRecordIndex}. + */ + public IIndex getReadOnlyCommitRecordIndex() { + + final ReadLock lock = _fieldReadWriteLock.readLock(); + + lock.lock(); + + try { + + assertOpen(); + + final CommitRecordIndex commitRecordIndex = getCommitRecordIndex(_rootBlock + .getCommitRecordIndexAddr()); + + return new ReadOnlyIndex(commitRecordIndex); + + } finally { + + lock.unlock(); + + } + + } + /** * Return the current state of the index that resolves timestamps to * {@link ICommitRecord}s. @@ -2871,7 +2900,7 @@ * {@link #getCommitRecord(long)} to obtain a distinct instance * suitable for read-only access. */ - public CommitRecordIndex getCommitRecordIndex() { + protected CommitRecordIndex getCommitRecordIndex() { final ReadLock lock = _fieldReadWriteLock.readLock(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java 2010-11-01 17:48:31 UTC (rev 3857) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java 2010-11-01 18:51:08 UTC (rev 3858) @@ -26,8 +26,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.Iterator; -import java.util.NoSuchElementException; import java.util.UUID; import com.bigdata.btree.BTree; @@ -698,34 +696,34 @@ } // CommitRecordIndexTupleSerializer - public Iterator<ICommitRecord> getCommitRecords(final long fromTime, final long toTime) { - return new Iterator<ICommitRecord>() { - ICommitRecord m_next = findNext(fromTime); - - public boolean hasNext() { - return m_next != null; - } +// public Iterator<ICommitRecord> getCommitRecords(final long fromTime, final long toTime) { +// return new Iterator<ICommitRecord>() { +// ICommitRecord m_next = findNext(fromTime); +// +// public boolean hasNext() { +// return m_next != null; +// } +// +// public ICommitRecord next() { +// if (m_next == null) { +// throw new NoSuchElementException(); +// } +// +// ICommitRecord ret = m_next; +// m_next = findNext(ret.getTimestamp()); +// +// if (m_next != null && m_next.getTimestamp() > toTime) { +// m_next = null; +// } +// +// return ret; +// } +// +// public void remove() { +// throw new RuntimeException("Invalid Operation"); +// } +// +// }; +// } - public ICommitRecord next() { - if (m_next == null) { - throw new NoSuchElementException(); - } - - ICommitRecord ret = m_next; - m_next = findNext(ret.getTimestamp()); - - if (m_next != null && m_next.getTimestamp() > toTime) { - m_next = null; - } - - return ret; - } - - public void remove() { - throw new RuntimeException("Invalid Operation"); - } - - }; - } - } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-01 17:48:31 UTC (rev 3857) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-01 18:51:08 UTC (rev 3858) @@ -299,7 +299,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ * @param <T> * * @todo report elapsed time and average latency for force, reopen, and @@ -1048,6 +1047,7 @@ * @todo Should be a NOP for the WORM? Check * {@link AbstractJournal#commitNow(long)} */ + @Override public void commit(IJournal journal) { flushWriteCache(); @@ -1060,6 +1060,7 @@ * Note: This assumes the caller is synchronized appropriately otherwise * writes belonging to other threads will be discarded from the cache! */ + @Override public void abort() { if (writeCacheService != null) { @@ -2240,8 +2241,4 @@ } - public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { - // NOP - } - } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-01 17:48:31 UTC (rev 3857) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-01 18:51:08 UTC (rev 3858) @@ -38,7 +38,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -48,21 +47,25 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.ITuple; +import com.bigdata.btree.ITupleIterator; +import com.bigdata.btree.IndexMetadata; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.BufferedWrite; import com.bigdata.io.writecache.WriteCache; -import com.bigdata.journal.Journal; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.CommitRecordIndex; +import com.bigdata.journal.CommitRecordSerializer; import com.bigdata.journal.ForceEnum; import com.bigdata.journal.ICommitRecord; import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.Journal; import com.bigdata.journal.JournalTransactionService; import com.bigdata.journal.Options; import com.bigdata.journal.RWStrategy.FileMetadataView; import com.bigdata.quorum.Quorum; -import com.bigdata.util.ChecksumError; import com.bigdata.util.ChecksumUtility; /** @@ -322,7 +325,7 @@ * the same txReleaseTime. */ private static final int MAX_DEFERRED_FREE = 4094; // fits in 16k block - volatile long m_lastDeferredReleaseTime = 23; // zero is invalid time + volatile long m_lastDeferredReleaseTime = 0L;// = 23; // zero is invalid time final ArrayList<Integer> m_currentTxnFreeList = new ArrayList<Integer>(); final PSOutputStream m_deferredFreeOut; @@ -1568,7 +1571,7 @@ return "RWStore " + s_version; } - public void commitChanges(Journal journal) { + public void commitChanges(final Journal journal) { checkCoreAllocations(); // take allocation lock to prevent other threads allocating during commit @@ -1651,34 +1654,55 @@ /** * Called prior to commit, so check whether storage can be freed and then * whether the deferredheader needs to be saved. + * <p> + * Note: The caller MUST be holding the {@link #m_allocationLock}. */ - public void checkDeferredFrees(boolean freeNow, Journal journal) { - if (journal != null) { - final JournalTransactionService transactionService = (JournalTransactionService) journal.getLocalTransactionManager().getTransactionService(); - - // Commit can be called prior to Journal initialisation, in which case - // the commitRecordIndex will not be set. - final CommitRecordIndex commitRecordIndex = journal.getCommitRecordIndex(); - - long latestReleasableTime = System.currentTimeMillis(); - - if (transactionService != null) { - latestReleasableTime -= transactionService.getMinReleaseAge(); - } - - Iterator<ICommitRecord> records = commitRecordIndex.getCommitRecords(m_lastDeferredReleaseTime, latestReleasableTime); - - freeDeferrals(records); - } - } + /* public */void checkDeferredFrees(final boolean freeNow, + final Journal journal) { - /** - * - * @return conservative requirement for metabits storage, mindful that the - * request to allocate the metabits may require an increase in the - * number of allocation blocks and therefore an extension to the - * number of metabits. - */ + // Note: Invoked from unit test w/o the lock... +// if (!m_allocationLock.isHeldByCurrentThread()) +// throw new IllegalMonitorStateException(); + + if (journal != null) { + + final JournalTransactionService transactionService = (JournalTransactionService) journal + .getLocalTransactionManager().getTransactionService(); + + // the previous commit point. + long latestReleasableTime = journal.getLastCommitTime(); + + if (latestReleasableTime == 0L) { + // Nothing committed. + return; + } + + // subtract out the retention period. + latestReleasableTime -= transactionService.getMinReleaseAge(); + + // add one to give this inclusive upper bound semantics. + latestReleasableTime++; + + /* + * Free deferrals. + * + * Note: This adds one to the lastDeferredReleaseTime to give + * exclusive lower bound semantics. + */ + freeDeferrals(journal, m_lastDeferredReleaseTime+1, + latestReleasableTime); + + } + + } + + /** + * + * @return conservative requirement for metabits storage, mindful that the + * request to allocate the metabits may require an increase in the + * number of allocation blocks and therefore an extension to the + * number of metabits. + */ private int getRequiredMetaBitsStorage() { int ints = 1 + m_allocSizes.length; // length prefixed alloc sizes ints += m_metaBits.length; @@ -2674,12 +2698,12 @@ } try { - Long freeTime = transactionService.tryCallWithLock(new Callable<Long>() { + final Long freeTime = transactionService.tryCallWithLock(new Callable<Long>() { public Long call() throws Exception { - long now = System.currentTimeMillis(); - long earliest = transactionService.getEarliestTxStartTime(); - long aged = now - transactionService.getMinReleaseAge(); + final long now = transactionService.nextTimestamp(); + final long earliest = transactionService.getEarliestTxStartTime(); + final long aged = now - transactionService.getMinReleaseAge(); if (transactionService.getActiveCount() == 0) { return aged; @@ -2733,14 +2757,14 @@ * Provided with the address of a block of addresses to be freed * @param blockAddr */ - protected void freeDeferrals(long blockAddr, long lastReleaseTime) { - int addr = (int) (blockAddr >> 32); - int sze = (int) blockAddr & 0xFFFFFF; + private void freeDeferrals(final long blockAddr, final long lastReleaseTime) { + final int addr = (int) (blockAddr >> 32); + final int sze = (int) blockAddr & 0xFFFFFF; if (log.isTraceEnabled()) log.trace("freeDeferrals at " + physicalAddress(addr) + ", size: " + sze + " releaseTime: " + lastReleaseTime); - byte[] buf = new byte[sze+4]; // allow for checksum + final byte[] buf = new byte[sze+4]; // allow for checksum getData(addr, buf); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_allocationLock.lock(); @@ -2749,9 +2773,9 @@ while (nxtAddr != 0) { // while (false && addrs-- > 0) { - Allocator alloc = getBlock(nxtAddr); + final Allocator alloc = getBlock(nxtAddr); if (alloc instanceof BlobAllocator) { - int bloblen = strBuf.readInt(); + final int bloblen = strBuf.readInt(); assert bloblen > 0; // a Blob address MUST have a size immediateFree(nxtAddr, bloblen); @@ -2761,28 +2785,76 @@ nxtAddr = strBuf.readInt(); } - m_lastDeferredReleaseTime = lastReleaseTime; + m_lastDeferredReleaseTime = lastReleaseTime; + if (log.isTraceEnabled()) + log.trace("Updated m_lastDeferredReleaseTime=" + + m_lastDeferredReleaseTime); } catch (IOException e) { throw new RuntimeException("Problem freeing deferrals", e); } finally { m_allocationLock.unlock(); } } - - /** - * Provided with an iterator of CommitRecords, process each and free - * any deferred deletes associated with each. - * - * @param commitRecords - */ - public void freeDeferrals(Iterator<ICommitRecord> commitRecords) { - while (commitRecords.hasNext()) { - ICommitRecord record = commitRecords.next(); - long blockAddr = record.getRootAddr(AbstractJournal.DELETEBLOCK); - if (blockAddr != 0) { - freeDeferrals(blockAddr, record.getTimestamp()); + + /** + * Provided with an iterator of CommitRecords, process each and free any + * deferred deletes associated with each. + * + * @param journal + * @param fromTime + * The inclusive lower bound. + * @param toTime + * The exclusive upper bound. + */ + private void freeDeferrals(final AbstractJournal journal, + final long fromTime, + final long toTime) { + + final ITupleIterator<CommitRecordIndex.Entry> commitRecords; + { + /* + * Commit can be called prior to Journal initialisation, in which + * case the commitRecordIndex will not be set. + */ + final IIndex commitRecordIndex = journal.getReadOnlyCommitRecordIndex(); + + final IndexMetadata metadata = commitRecordIndex + .getIndexMetadata(); + + final byte[] fromKey = metadata.getTupleSerializer() + .serializeKey(fromTime); + + final byte[] toKey = metadata.getTupleSerializer() + .serializeKey(toTime); + + commitRecords = commitRecordIndex + .rangeIterator(fromKey, toKey); + + } + + if(log.isTraceEnabled()) + log.trace("fromTime=" + fromTime + ", toTime=" + toTime); + + while (commitRecords.hasNext()) { + + final ITuple<CommitRecordIndex.Entry> tuple = commitRecords.next(); + + final CommitRecordIndex.Entry entry = tuple.getObject(); + + final ICommitRecord record = CommitRecordSerializer.INSTANCE + .deserialize(journal.read(entry.addr)); + + final long blockAddr = record + .getRootAddr(AbstractJournal.DELETEBLOCK); + + if (blockAddr != 0) { + + freeDeferrals(blockAddr, record.getTimestamp()); + } - } + + } + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3881]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-03 16:20:27
|
Revision: 3881 http://bigdata.svn.sourceforge.net/bigdata/?rev=3881&view=rev Author: thompsonbry Date: 2010-11-03 16:20:20 +0000 (Wed, 03 Nov 2010) Log Message: ----------- Synching to Martyn. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockView.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/WormAddressManager.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWAddressManager.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -35,7 +35,6 @@ import org.apache.log4j.Logger; import com.bigdata.io.FileChannelUtility; -import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.rawstore.AbstractRawWormStore; import com.bigdata.rawstore.Bytes; Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWAddressManager.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWAddressManager.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWAddressManager.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -0,0 +1,28 @@ +package com.bigdata.journal; + +import com.bigdata.rawstore.IAddressManager; + +/** + * + * FIXME unit tests. + */ +public class RWAddressManager implements IAddressManager { + + public int getByteCount(final long addr) { + return (int) (addr & 0xFFFFFFFFL); + } + + public long getOffset(final long addr) { + return addr >> 32; + } + + public long toAddr(final int nbytes, final long offset) { + return (offset << 32) + nbytes; + } + + public String toString(final long addr) { + return "{off=" + getOffset(addr) + ",len=" + getByteCount(addr) + + "}"; + } + +} \ No newline at end of file Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWAddressManager.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -67,6 +67,8 @@ private static final transient Logger log = Logger.getLogger(RWStrategy.class); + private final IAddressManager m_am = new RWAddressManager(); + final private FileMetadata m_fileMetadata; final private Quorum<?,?> m_environment; @@ -125,6 +127,7 @@ m_rb1 = copyRootBlock(false); m_initialExtent = m_fileMetadata.file.length(); + } /** @@ -368,53 +371,21 @@ } - /* - * FIXME Reconcile this class with the methods on the outer class. - */ - public static class RWAddressManager implements IAddressManager { - - public int getByteCount(final long addr) { - - return (int) addr & 0xFFFFFF; - - } - - public long getOffset(final long addr) { - - return -(addr >> 32); - - } - - public long toAddr(final int nbytes, long offset) { - - offset <<= 32; - - return offset + nbytes; - - } - - public String toString(final long addr) { - - return "{off=" + getOffset(addr) + ",len=" + getByteCount(addr) - + "}"; - - } - - } - - final private IAddressManager m_am = new RWAddressManager(); - - public IAddressManager getAddressManager() { - return m_am; - } - + /** + * Operation is not supported. + * + * @throws UnsupportedOperationException + * always. + */ public void closeForWrites() { - // TODO Auto-generated method stub + // @todo could be implemented at some point. throw new UnsupportedOperationException(); } public BufferMode getBufferMode() { - return BufferMode.DiskRW; + + return BufferMode.DiskRW; + } /** @@ -423,19 +394,22 @@ * points in hierarchies belonging to the caller. */ public CounterSet getCounters() { - return new CounterSet(); + + return new CounterSet(); + } public long getExtent() { - return this.m_fileMetadata.file.length(); + + return this.m_fileMetadata.file.length(); + } public int getHeaderSize() { - // TODO Auto-generated method stub - return 0; + return FileMetadata.headerSize0; } - private long m_initialExtent = 0; + final private long m_initialExtent; private volatile boolean m_needsReopen = false; @@ -454,31 +428,30 @@ public long getNextOffset() { return m_store.getNextOffset(); } - /** - * TODO: Should this mean the same - */ + public long getUserExtent() { - // TODO Auto-generated method stub - return 0; + + return m_store.getFileStorage(); + } + /** + * Operation is not supported. + * + * @throws UnsupportedOperationException + * always. + */ public long transferTo(RandomAccessFile out) throws IOException { - // TODO Auto-generated method stub - return 0; + + // @todo could perhaps be implemented at some point. + throw new UnsupportedOperationException(); + } - /** - * This method means more to a WORM than a RW since it assumes an allocation strategy - */ - public long ensureMinFree(final long minFree) { - - throw new UnsupportedOperationException(); - - } - - public void truncate(long extent) { - // TODO Auto-generated method stub - + public void truncate(final long extent) { + + m_store.establishExtent(extent); + } public void writeRootBlock(final IRootBlockView rootBlock, @@ -702,61 +675,80 @@ return true; } + /** + * {@inheritDoc} + * <p> + * This implementation returns the amount of utilized storage. + */ public long size() { - // TODO Auto-generated method stub - return 0; + return m_store.getFileStorage(); } - public int getByteCount(long addr) { - return (int) addr & 0xFFFFFFFF; - } + /* + * IAddressManager + */ - public long getOffset(long addr) { - return addr >> 32; - } + public IAddressManager getAddressManager() { + return m_am; + } - public long toAddr(final int nbytes, final long offset) { - return (offset << 32) + nbytes; - } + public int getByteCount(final long addr) { + return m_am.getByteCount(addr); + } - public String toString(final long addr) { - return m_am.toString(addr); - } + public long getOffset(final long addr) { + return m_am.getOffset(addr); + } - /** - * The state of the provided block is not relevant since it does not hold - * information on recent allocations (the meta allocations will only effect the - * root block after a commit) - */ - public boolean requiresCommit(IRootBlockView block) { - return m_store.requiresCommit(); - } + public long toAddr(final int nbytes, final long offset) { + return m_am.toAddr(nbytes, offset); + } - public long getMetaBitsAddr() { - return m_store.getMetaBitsAddr(); - } + public String toString(final long addr) { + return m_am.toString(addr); + } + + /** + * {@inheritDoc} + * <p> + * The state of the provided block is not relevant since it does not hold + * information on recent allocations (the meta allocations will only effect + * the root block after a commit). This is passed through to the + * {@link RWStore} which examines its internal state. + */ + public boolean requiresCommit(final IRootBlockView block) { - public long getMetaStartAddr() { - return m_store.getMetaStartAddr(); - } + return m_store.requiresCommit(); + + } - /** - * Appears only to be used in unit tests. Return current max fix allocation block of 8K. - * - * FIXME: need to provide configurable allocation block sizes for the RWStore and this should access the same - * information. - */ + public long getMetaBitsAddr() { + + return m_store.getMetaBitsAddr(); + + } + + public long getMetaStartAddr() { + + return m_store.getMetaStartAddr(); + + } + public int getMaxRecordSize() { - return 8 * 1024; + + return m_store.getMaxAllocSize() - 4/* checksum */; + } - /** - * Although the RW Store uses a latched addressing strategy it is not meaningful to make this available - * in this interface. - */ - public int getOffsetBits() { - return 0; - } + /** + * Although the RW Store uses a latched addressing strategy it is not + * meaningful to make this available in this interface. + */ + public int getOffsetBits() { + + return 0; + + } /** * Used for unit tests, could also be used to access raw statistics. @@ -764,9 +756,35 @@ * @return the associated RWStore */ public RWStore getRWStore() { - return m_store; + + return m_store; + } + public long getPhysicalAddress(final long addr) { + + final int rwaddr = decodeAddr(addr); + + return m_store.physicalAddress(rwaddr); + } + + /** + * Saves the current list of delete blocks, returning the address allocated. + * This can be used later to retrieve the addresses of allocations to be + * freed. + * + * @return the address of the delete blocks, or zero if none + */ + public long saveDeleteBlocks() { + + return m_store.saveDeferrals(); + + } + + /* + * IHABufferStrategy + */ + // FIXME writeRawBuffer public void writeRawBuffer(HAWriteMessage msg, ByteBuffer b) throws IOException, InterruptedException { @@ -776,40 +794,26 @@ } // FIXME readFromLocalStore - public ByteBuffer readFromLocalStore(long addr) throws InterruptedException { - + public ByteBuffer readFromLocalStore(final long addr) + throws InterruptedException { + throw new UnsupportedOperationException(); - + } - /** - * Called from HAGlue.receiveAndReplicate to ensure the correct file extent - * prior to any writes. - * For RW this is essential as the allocaiton blocks for current committed data - * could otherwise be overwritten and the store invalidated. - * - * @see com.bigdata.journal.IHABufferStrategy#setExtentForLocalStore(long) - */ - public void setExtentForLocalStore(long extent) throws IOException, InterruptedException { - - m_store.establishHAExtent(extent); - - } + /** + * Called from HAGlue.receiveAndReplicate to ensure the correct file extent + * prior to any writes. For RW this is essential as the allocation blocks + * for current committed data could otherwise be overwritten and the store + * invalidated. + * + * @see com.bigdata.journal.IHABufferStrategy#setExtentForLocalStore(long) + */ + public void setExtentForLocalStore(final long extent) throws IOException, + InterruptedException { - public long getPhysicalAddress(long addr) { - int rwaddr = decodeAddr(addr); - - return m_store.physicalAddress(rwaddr); - } + m_store.establishExtent(extent); - /** - * Saves the current list of delete blocks, returning the address allocated. - * This can be used later to retrieve the addresses of allocations to be - * freed. - * - * @return the address of the delete blocks, or zero if none - */ - public long saveDeleteBlocks() { - return m_store.saveDeferrals(); - } + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockView.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockView.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RootBlockView.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -239,7 +239,7 @@ if(addr==0L) return; // TODO develop protocol to support address checking - if (am instanceof RWStrategy.RWAddressManager) return; + if (am instanceof RWAddressManager) return; final long offset = am.getOffset(addr); @@ -382,7 +382,7 @@ case RW: { // @todo check metaStartAddr // @todo check metaBitsAddr - am = new RWStrategy.RWAddressManager(); + am = new RWAddressManager(); // @todo check nextOffset break; } @@ -646,7 +646,7 @@ switch (getStoreType()) { case RW: { - am = new RWStrategy.RWAddressManager(); + am = new RWAddressManager(); break; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -162,7 +162,7 @@ * Extent of the file. This value should be valid since we obtain an * exclusive lock on the file when we open it. * - * @todo Atomic long to ensure visiblility of changes? + * @todo Atomic long to ensure visibility of changes? */ private long extent; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -38,7 +38,6 @@ import com.bigdata.io.IByteArrayBuffer; import com.bigdata.journal.AbstractJournal; import com.bigdata.mdi.IResourceMetadata; -import com.bigdata.rwstore.IAllocationContext; /** * <p> Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/WormAddressManager.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/WormAddressManager.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/WormAddressManager.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -27,18 +27,10 @@ package com.bigdata.rawstore; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.math.BigInteger; import java.text.NumberFormat; -import org.CognitiveWeb.extser.LongPacker; -import org.CognitiveWeb.extser.ShortPacker; - import com.bigdata.btree.IndexSegmentAddressManager; -import com.bigdata.io.DataInputBuffer; -import com.bigdata.io.DataOutputBuffer; /** * Encapsulates logic for operations on an opaque long integer comprising an Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -153,9 +153,9 @@ str.writeInt(m_size); - final Iterator iter = m_allocBlocks.iterator(); + final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); while (iter.hasNext()) { - final AllocBlock block = (AllocBlock) iter.next(); + final AllocBlock block = iter.next(); str.writeInt(block.m_addr); for (int i = 0; i < m_bitSize; i++) { @@ -465,8 +465,9 @@ // Should have been first on list, now check for first if (m_freeList.size() > 0) { - FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); - System.out.println("Freelist head: " + nxt.getSummaryStats()); + final FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); + if (log.isInfoEnabled()) + log.info("Freelist head: " + nxt.getSummaryStats()); } } @@ -494,7 +495,7 @@ int baseAddr = -(m_index << 16); // bit adjust?? while (blocks.hasNext()) { - AllocBlock block = (AllocBlock) blocks.next(); + final AllocBlock block = (AllocBlock) blocks.next(); block.addAddresses(addrs, baseAddr); @@ -513,7 +514,8 @@ return m_index; } - public void appendShortStats(StringBuilder str, AllocationStats[] stats) { + public void appendShortStats(final StringBuilder str, + final AllocationStats[] stats) { int si = -1; @@ -528,9 +530,9 @@ } } - Iterator<AllocBlock> blocks = m_allocBlocks.iterator(); + final Iterator<AllocBlock> blocks = m_allocBlocks.iterator(); while (blocks.hasNext()) { - AllocBlock block = blocks.next(); + final AllocBlock block = blocks.next(); if (block.m_addr != 0) { str.append(block.getStats(si == -1 ? null : stats[si])); } else { @@ -542,7 +544,7 @@ public int getAllocatedBlocks() { int allocated = 0; - Iterator<AllocBlock> blocks = m_allocBlocks.iterator(); + final Iterator<AllocBlock> blocks = m_allocBlocks.iterator(); while (blocks.hasNext()) { if (blocks.next().m_addr != 0) { allocated++; @@ -569,18 +571,18 @@ } /** - * Computes the amount of staorge allocated using the freeBits count. + * Computes the amount of storage allocated using the freeBits count. * * @return the amount of storage to alloted slots in the allocation blocks */ public long getAllocatedSlots() { - int allocBlocks = getAllocatedBlocks(); + final int allocBlocks = getAllocatedBlocks(); int xtraFree = m_allocBlocks.size() - allocBlocks; xtraFree *= 32 * m_bitSize; - int freeBits = m_freeBits - xtraFree; + final int freeBits = m_freeBits - xtraFree; - long alloted = (allocBlocks * 32 * m_bitSize) - freeBits; + final long alloted = (allocBlocks * 32 * m_bitSize) - freeBits; return alloted * m_size; } @@ -588,22 +590,24 @@ public boolean isAllocated(int offset) { offset -= 3; - int allocBlockRange = 32 * m_bitSize; + final int allocBlockRange = 32 * m_bitSize; - AllocBlock block = (AllocBlock) m_allocBlocks.get(offset / allocBlockRange); - int bit = offset % allocBlockRange; + final AllocBlock block = (AllocBlock) m_allocBlocks.get(offset / allocBlockRange); + final int bit = offset % allocBlockRange; + return RWStore.tstBit(block.m_bits, bit); } public boolean isCommitted(int offset) { offset -= 3; - int allocBlockRange = 32 * m_bitSize; + final int allocBlockRange = 32 * m_bitSize; - AllocBlock block = (AllocBlock) m_allocBlocks.get(offset / allocBlockRange); - int bit = offset % allocBlockRange; + final AllocBlock block = (AllocBlock) m_allocBlocks.get(offset / allocBlockRange); + final int bit = offset % allocBlockRange; + return RWStore.tstBit(block.m_commit, bit); } @@ -611,7 +615,8 @@ * If the context is this allocators context AND it is not in the commit bits * then we can immediately free. */ - public boolean canImmediatelyFree(int addr, int size, IAllocationContext context) { + public boolean canImmediatelyFree(final int addr, final int size, + final IAllocationContext context) { if (context == m_context) { final int offset = ((-addr) & RWStore.OFFSET_BITS_MASK); // bit adjust Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-03 15:03:04 UTC (rev 3880) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-03 16:20:20 UTC (rev 3881) @@ -96,7 +96,7 @@ * ready to be freed. * <p> * The method of storing the allocation headers has been changed from always - * allocating at the end of the file (and moving them on fle extend) to + * allocating at the end of the file (and moving them on file extend) to * allocation of fixed areas. The meta-allocation data, containing the bitmap * that controls these allocations, is itself stored in the heap, and is now * structured to include both the bit data and the list of meta-storage @@ -198,14 +198,13 @@ * sizes, so a 4K boundary is expressed as <code>64</code>. The default * series of allocation sizes is based on the Fibonacci sequence, but is * pegged to the closest 4K boundary for values larger than 4k. + * + * @see #m_allocSizes */ private static final int[] DEFAULT_ALLOC_SIZES = { 1, 2, 3, 5, 8, 12, 16, 32, 48, 64, 128, 192, 320, 512, 832, 1344, 2176, 3520 }; // private static final int[] DEFAULT_ALLOC_SIZES = { 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181 }; // private static final int[] ALLOC_SIZES = { 1, 2, 4, 8, 16, 32, 64, 128 }; - final int m_maxFixedAlloc; - final int m_minFixedAlloc; - /** * The fixed size of any allocator on the disk in bytes. The #of allocations * managed by an allocator is this value times 8 because each slot uses one @@ -241,6 +240,10 @@ // protected int m_transactionCount; // private boolean m_committing; + /** + * FIXME This is initially true and is never set to false. Should this all + * go away? + */ private boolean m_preserveSession = true; // private boolean m_readOnly; @@ -268,7 +271,22 @@ private final RWWriteCacheService m_writeCache; + /** + * The actual allocation sizes as read from the store. + * + * @see #DEFAULT_ALLOC_SIZES + */ private int[] m_allocSizes; + + /** + * The maximum allocation size (bytes). + */ + final int m_maxFixedAlloc; + + /** + * The minimum allocation size (bytes). + */ + final int m_minFixedAlloc; /** * This lock is used to exclude readers when the extent of the backing file @@ -404,6 +422,7 @@ m_fmv = fileMetadataView; m_fd = m_fmv.getFile(); + m_raf = m_fmv.getRandomAccessFile(); m_rb = m_fmv.getRootBlock(); @@ -2279,12 +2298,12 @@ return alloc; } - private int blockIndex(int addr) { - return (-addr) >>> OFFSET_BITS; - } +// private int blockIndex(int addr) { +// return (-addr) >>> OFFSET_BITS; +// } private Allocator getBlock(final int addr) { - int index = (-addr) >>> OFFSET_BITS; + final int index = (-addr) >>> OFFSET_BITS; return (Allocator) m_allocs.get(index); } @@ -2293,24 +2312,24 @@ return (-addr) & OFFSET_BITS_MASK; // OFFSET_BITS } - public int addr2Size(final int addr) { - if (addr > 0) { - int size = 0; +// public int addr2Size(final int addr) { +// if (addr > 0) { +// int size = 0; +// +// final int index = ((int) addr) % 16; +// +// if (index == 15) { // blob +// throw new Error("FIX ME : legacy BLOB code being accessed somehow"); +// } else { +// size = m_minFixedAlloc * m_allocSizes[index]; +// } +// +// return size; +// } else { +// return getBlock(addr).getPhysicalSize(getOffset(addr)); +// } +// } - final int index = ((int) addr) % 16; - - if (index == 15) { // blob - throw new Error("FIX ME : legacy BLOB code being accessed somehow"); - } else { - size = m_minFixedAlloc * m_allocSizes[index]; - } - - return size; - } else { - return getBlock(addr).getPhysicalSize(getOffset(addr)); - } - } - public boolean isNativeAddress(final long addr) { return addr <= 0; } @@ -2636,14 +2655,15 @@ } /** - * Delegated to from setExtentForLocalStore after expected call from HAGlue.replicateAndReceive. + * Delegated to from setExtentForLocalStore after expected call from + * HAGlue.replicateAndReceive. * - * If the current file extent is different from the required extent then the call is made to move - * the allocation blocks. + * If the current file extent is different from the required extent then the + * call is made to move the allocation blocks. * * @param extent */ - public void establishHAExtent(final long extent) { + public void establishExtent(final long extent) { final long currentExtent = convertAddr(m_fileSize); @@ -3053,4 +3073,13 @@ return ret; } + /** + * The maximum allocation size (bytes). + */ + public int getMaxAllocSize() { + + return m_maxFixedAlloc; + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3887]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-04 12:45:25
|
Revision: 3887 http://bigdata.svn.sourceforge.net/bigdata/?rev=3887&view=rev Author: thompsonbry Date: 2010-11-04 12:45:18 +0000 (Thu, 04 Nov 2010) Log Message: ----------- Added some fields from AbstractBufferStrategy for error messages to RWStrategy. Added support for HA (readFromLocalStore and writeRawBuffer). Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -55,8 +55,8 @@ import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.journal.AbstractBufferStrategy; -import com.bigdata.journal.DiskOnlyStrategy; import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.WORMStrategy; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; @@ -75,7 +75,7 @@ * <ol> * <li>Gathered writes. This case is used by the {@link RWStore}.</li> * <li>Pure append of sequentially allocated records. This case is used by the - * {@link DiskOnlyStrategy} (WORM) and by the {@link IndexSegmentBuilder}.</li> + * {@link WORMStrategy} (WORM) and by the {@link IndexSegmentBuilder}.</li> * <li>Write of a single large buffer owned by the caller. This case may be used * when the caller wants to manage the buffers or when the caller's buffer is * larger than the write cache.</li> @@ -1482,7 +1482,7 @@ /** * A {@link WriteCache} implementation suitable for an append-only file such - * as the {@link DiskOnlyStrategy} or the output file of the + * as the {@link WORMStrategy} or the output file of the * {@link IndexSegmentBuilder}. * * @author <a href="mailto:tho...@us...">Bryan @@ -1622,10 +1622,12 @@ * Called by WriteCacheService to process a direct write for large * blocks and also to flush data from dirty caches. */ - protected boolean writeOnChannel(final ByteBuffer data, final long firstOffsetIgnored, - final Map<Long, RecordMetadata> recordMap, final long nanos) throws InterruptedException, IOException { + protected boolean writeOnChannel(final ByteBuffer data, + final long firstOffsetIgnored, + final Map<Long, RecordMetadata> recordMap, final long nanos) + throws InterruptedException, IOException { - final long begin = System.nanoTime(); + final long begin = System.nanoTime(); final int nbytes = data.remaining(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -76,7 +76,7 @@ * offset plus record length exceeds the {@link #nextOffset} on which data * would be written may be easily detected. */ - protected static final String ERR_ADDRESS_NOT_WRITTEN = "Address never written."; + public static final String ERR_ADDRESS_NOT_WRITTEN = "Address never written."; /** * Text of the error message used when a ZERO (0L) is passed as an address @@ -99,19 +99,19 @@ * array or native memory (both are limited to int32 bytes since they * are addressed by a Java <code>int</code>). */ - protected static final String ERR_INT32 = "Would exceed int32 bytes (not allowed unless backed by disk)."; + public static final String ERR_INT32 = "Would exceed int32 bytes (not allowed unless backed by disk)."; /** * Text of the error message used when * {@link IBufferStrategy#truncate(long)} would truncate data that has * already been written. */ - protected static final String ERR_TRUNCATE = "Would truncate written data."; + public static final String ERR_TRUNCATE = "Would truncate written data."; /** * Error message used when the writes are not allowed. */ - protected static final String ERR_READ_ONLY = "Read only"; + public static final String ERR_READ_ONLY = "Read only"; /** * Error message used when the record size is invalid (e.g., negative). @@ -119,14 +119,21 @@ * @todo There is some overlap with {@link #ERR_RECORD_LENGTH_ZERO} and * {@link #ERR_BUFFER_EMPTY}. */ - protected static final String ERR_BAD_RECORD_SIZE = "Bad record size"; + public static final String ERR_BAD_RECORD_SIZE = "Bad record size"; /** - * Error message used when the store is closed. + * Error message used when the store is closed but the operation requires + * that the store is open. */ public static final String ERR_NOT_OPEN = "Not open"; /** + * Error message used when the store is open by the operation requires that + * the store is closed. + */ + public static final String ERR_OPEN = "Open"; + + /** * Error message used when an operation would write more data than would be * permitted onto a buffer. */ Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -33,7 +33,7 @@ import org.apache.log4j.Logger; import com.bigdata.counters.CounterSet; -import com.bigdata.journal.WORMStrategy.StoreCounters; +import com.bigdata.ha.QuorumRead; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.quorum.Quorum; @@ -41,6 +41,8 @@ import com.bigdata.rawstore.IAddressManager; import com.bigdata.rwstore.IAllocationContext; import com.bigdata.rwstore.RWStore; +import com.bigdata.rwstore.RWStore.StoreCounters; +import com.bigdata.util.ChecksumError; /** * A highly scalable persistent {@link IBufferStrategy} wrapping the @@ -89,15 +91,26 @@ */ final private long m_initialExtent; + /** + * The HA {@link Quorum} (optional). + */ + private final Quorum<?,?> m_quorum; + /** * * @param fileMetadata - * @param quorum + * @param quorum The HA {@link Quorum} (optional). */ RWStrategy(final FileMetadata fileMetadata, final Quorum<?, ?> quorum) { + if (fileMetadata == null) + throw new IllegalArgumentException(); + m_uuid = fileMetadata.rootBlock.getUUID(); + // MAY be null. + m_quorum = quorum; + m_store = new RWStore(fileMetadata, quorum); m_initialExtent = fileMetadata.file.length(); @@ -110,31 +123,50 @@ } - /* - * FIXME This does not handle the read-from-peer HA integration. See - * WORMStrategy#read(). - * - * FIXME This does not update the StoreCounters. - */ public ByteBuffer read(final long addr) { - final int rwaddr = decodeAddr(addr); - final int sze = decodeSize(addr); + try { + // Try reading from the local store. + return readFromLocalStore(addr); + } catch (InterruptedException e) { + // wrap and rethrow. + throw new RuntimeException(e); + } catch (ChecksumError e) { + /* + * Note: This assumes that the ChecksumError is not wrapped by + * another exception. If it is, then the ChecksumError would not be + * caught. + */ + // log the error. + try { + log.error(e + " : addr=" + toString(addr), e); + } catch (Throwable ignored) { + // ignore error in logging system. + } + // update the performance counters. + final StoreCounters<?> c = (StoreCounters<?>) m_store.getStoreCounters() + .acquire(); + try { + c.checksumErrorCount++; + } finally { + c.release(); + } + if (m_quorum != null && m_quorum.isHighlyAvailable()) { + if (m_quorum.isQuorumMet()) { + try { + // Read on another node in the quorum. + final byte[] a = ((QuorumRead<?>) m_quorum.getMember()) + .readFromQuorum(m_uuid, addr); + return ByteBuffer.wrap(a); + } catch (Throwable t) { + throw new RuntimeException("While handling: " + e, t); + } + } + } + // Otherwise rethrow the checksum error. + throw e; + } - if (rwaddr == 0L || sze == 0) { - throw new IllegalArgumentException(); - } - - /** - * Allocate buffer to include checksum to allow single read - * but then return ByteBuffer excluding those bytes - */ - final byte buf[] = new byte[sze+4]; // 4 bytes for checksum - - m_store.getData(rwaddr, buf); - - return ByteBuffer.wrap(buf, 0, sze); - } public long write(final ByteBuffer data) { @@ -145,11 +177,9 @@ public long write(final ByteBuffer data, final IAllocationContext context) { - if (!isOpen()) - throw new IllegalStateException(); - - if (data == null) - throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BUFFER_NULL); if (data.hasArray() && data.arrayOffset() != 0) { /* @@ -166,7 +196,8 @@ final int nbytes = data.remaining(); if (nbytes == 0) - throw new IllegalArgumentException(); + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BUFFER_EMPTY); final long rwaddr = m_store.alloc(data.array(), nbytes, context); @@ -208,9 +239,18 @@ * this data, and if not free immediately, otherwise defer. */ public void delete(final long addr, final IAllocationContext context) { + + final int rwaddr = decodeAddr(addr); - final int rwaddr = decodeAddr(addr); final int sze = decodeSize(addr); + + if (rwaddr == 0L) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_ADDRESS_IS_NULL); + + if (sze == 0) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BAD_RECORD_SIZE); m_store.free(rwaddr, sze, context); @@ -316,10 +356,10 @@ private void assertOpen() { if (!m_store.isOpen()) - throw new IllegalStateException(); - + throw new IllegalStateException(AbstractBufferStrategy.ERR_NOT_OPEN); + } - + public void close() { // throw exception if open per the API. @@ -332,7 +372,7 @@ public void deleteResources() { if (m_store.isOpen()) - throw new IllegalStateException(); + throw new IllegalStateException(AbstractBufferStrategy.ERR_OPEN); final File file = m_store.getStoreFile(); @@ -534,25 +574,39 @@ /* * IHABufferStrategy */ - - /** - * Operation is not supported. - */ - public void writeRawBuffer(HAWriteMessage msg, ByteBuffer b) + + public void writeRawBuffer(final HAWriteMessage msg, final ByteBuffer b) throws IOException, InterruptedException { - throw new UnsupportedOperationException(); + m_store.writeRawBuffer(msg, b); } - /** - * Operation is not supported. - */ public ByteBuffer readFromLocalStore(final long addr) throws InterruptedException { - throw new UnsupportedOperationException(); + final int rwaddr = decodeAddr(addr); + final int sze = decodeSize(addr); + + if (rwaddr == 0L) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_ADDRESS_IS_NULL); + + if (sze == 0) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BAD_RECORD_SIZE); + + /** + * Allocate buffer to include checksum to allow single read but then + * return ByteBuffer excluding those bytes + */ + final byte buf[] = new byte[sze + 4]; // 4 bytes for checksum + + m_store.getData(rwaddr, buf); + + return ByteBuffer.wrap(buf, 0, sze); + } /** Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -2231,9 +2231,6 @@ } - /** - * Extend file if required for HAWriteMessage - just call through to truncate - */ public void setExtentForLocalStore(final long extent) throws IOException, InterruptedException { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -59,6 +59,8 @@ import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.BufferedWrite; import com.bigdata.io.writecache.WriteCache; +import com.bigdata.io.writecache.WriteCacheService; +import com.bigdata.journal.AbstractBufferStrategy; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.CommitRecordIndex; import com.bigdata.journal.CommitRecordSerializer; @@ -71,7 +73,7 @@ import com.bigdata.journal.Options; import com.bigdata.journal.RootBlockUtility; import com.bigdata.journal.RootBlockView; -import com.bigdata.journal.WORMStrategy.StoreCounters; +import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; import com.bigdata.rawstore.IRawStore; import com.bigdata.util.ChecksumUtility; @@ -376,31 +378,23 @@ } - /* - * FIXME Update counters when writing on the disk. + /** + * {@inheritDoc} + * <p> + * Note: The performance counters for writes to the disk are reported by + * the {@link WriteCacheService}. The {@link RWStore} never writes + * directly onto the disk (other than the root blocks). */ @Override protected boolean writeOnChannel(final ByteBuffer data, final long firstOffsetignored, final Map<Long, RecordMetadata> recordMap, final long nanos) throws InterruptedException, IOException { -// final long begin = System.nanoTime(); final Lock readLock = m_extensionLock.readLock(); readLock.lock(); try { boolean ret = super.writeOnChannel(data, firstOffsetignored, recordMap, nanos); -// // Update counters. -// final long elapsed = (System.nanoTime() - begin); -// final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() -// .acquire(); -// try { -// c.ndiskWrite += nwrites; -// c.bytesWrittenOnDisk += nbytes; -// c.elapsedDiskWriteNanos += elapsed; -// } finally { -// c.release(); -// } return ret; } finally { readLock.unlock(); @@ -416,14 +410,6 @@ }; -// private String m_filename; - -// private final FileMetadataView m_fmv; - -// private volatile IRootBlockView m_rb; - -// volatile private long m_commitCounter; - volatile private int m_metaBitsAddr; /** @@ -571,8 +557,10 @@ } private void assertOpen() { - if(!m_open) - throw new IllegalStateException(); + + if (!m_open) + throw new IllegalStateException(AbstractBufferStrategy.ERR_NOT_OPEN); + } synchronized public void close() { @@ -987,7 +975,9 @@ * address of the BlobHeader record. */ public void getData(final long addr, final byte buf[]) { - getData(addr, buf, 0, buf.length); + + getData(addr, buf, 0, buf.length); + } public void getData(final long addr, final byte buf[], final int offset, @@ -999,6 +989,8 @@ return; } + final long begin = System.nanoTime(); + final Lock readLock = m_extensionLock.readLock(); readLock.lock(); @@ -1048,23 +1040,43 @@ } } - try { - final long paddr = physicalAddress((int) addr); - if (paddr == 0) { - assertAllocators(); - - log.warn("Address " + addr + " did not resolve to physical address"); - - throw new IllegalArgumentException("Address " + addr + " did not resolve to physical address"); + { + final StoreCounters<?> storeCounters = (StoreCounters<?>) this.storeCounters + .get().acquire(); + try { + final int nbytes = length; + if (nbytes > storeCounters.maxReadSize) { + storeCounters.maxReadSize = nbytes; + } + } finally { + storeCounters.release(); + } + } + + try { + + final long paddr = physicalAddress((int) addr); + + if (paddr == 0) { + + assertAllocators(); + + final String msg = "Address did not resolve to physical address: " + + addr; + + log.warn(msg); + + throw new IllegalArgumentException(msg); + } - /** - * Check WriteCache first - * - * Note that the buffer passed in should include the checksum - * value, so the cached data is 4 bytes less than the - * buffer size. - */ + /** + * Check WriteCache first + * + * Note that the buffer passed in should include the checksum + * value, so the cached data is 4 bytes less than the buffer + * size. + */ final ByteBuffer bbuf; try { bbuf = m_writeCache.read(paddr); @@ -1089,7 +1101,24 @@ buf[offset+i] = in[i]; } m_cacheReads++; + /* + * Hit on the write cache. + * + * Update the store counters. + */ + final StoreCounters<?> c = (StoreCounters<?>) storeCounters + .get().acquire(); + try { + final int nbytes = length; + c.nreads++; + c.bytesRead += nbytes; + c.elapsedReadNanos += (System.nanoTime() - begin); + } finally { + c.release(); + } } else { + // Read through to the disk. + final long beginDisk = System.nanoTime(); // If checksum is required then the buffer should be sized to include checksum in final 4 bytes final ByteBuffer bb = ByteBuffer.wrap(buf, offset, length); FileChannelUtility.readAll(m_reopener, bb, paddr); @@ -1111,6 +1140,19 @@ } m_diskReads++; + // Update counters. + final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() + .acquire(); + try { + final int nbytes = length; + c.nreads++; + c.bytesRead += nbytes; + c.bytesReadFromDisk += nbytes; + c.elapsedReadNanos += (System.nanoTime() - begin); + c.elapsedDiskReadNanos += (System.nanoTime() - beginDisk); + } finally { + c.release(); + } } } catch (Throwable e) { log.error(e,e); @@ -3232,10 +3274,14 @@ return tmp; } - + /** * Striped performance counters for {@link IRawStore} access, including * operations that read or write through to the underlying media. + * <p> + * Note: The performance counters for writes to the disk are reported by the + * {@link WriteCacheService}. The {@link RWStore} never writes directly onto + * the disk (other than the root blocks). * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> @@ -3243,8 +3289,8 @@ * * @todo report elapsed time and average latency for force, reopen, and * writeRootBlock. - * - * FIXME CAT may be much faster than striped locks (2-3x faster). + * + * FIXME CAT may be much faster than striped locks (2-3x faster). */ static public class StoreCounters<T extends StoreCounters<T>> extends StripedCounters<T> { @@ -3289,10 +3335,11 @@ */ public volatile long nwrites; - /** - * #of write requests that write through to the backing file. - */ - public volatile long ndiskWrite; + // This is reported by the WriteCacheService. +// /** +// * #of write requests that write through to the backing file. +// */ +// public volatile long ndiskWrite; /** * The size of the largest record read. @@ -3308,21 +3355,23 @@ * #of bytes written. */ public volatile long bytesWritten; + + // This is reported by the WriteCacheService. +// /** +// * #of bytes that have been written on the disk. +// */ +// public volatile long bytesWrittenOnDisk; /** - * #of bytes that have been written on the disk. - */ - public volatile long bytesWrittenOnDisk; - - /** * Total elapsed time for writes. */ public volatile long elapsedWriteNanos; - /** - * Total elapsed time for writing on the disk. - */ - public volatile long elapsedDiskWriteNanos; + // This is reported by the WriteCacheService. +// /** +// * Total elapsed time for writing on the disk. +// */ +// public volatile long elapsedDiskWriteNanos; /** * #of times the data were forced to the disk. @@ -3381,12 +3430,12 @@ checksumErrorCount += o.checksumErrorCount; nwrites += o.nwrites; - ndiskWrite += o.ndiskWrite; +// ndiskWrite += o.ndiskWrite; maxWriteSize = Math.max(maxWriteSize, o.maxWriteSize); bytesWritten += o.bytesWritten; - bytesWrittenOnDisk += o.bytesWrittenOnDisk; +// bytesWrittenOnDisk += o.bytesWrittenOnDisk; elapsedWriteNanos += o.elapsedWriteNanos; - elapsedDiskWriteNanos += o.elapsedDiskWriteNanos; +// elapsedDiskWriteNanos += o.elapsedDiskWriteNanos; nforce += o.nforce; ntruncate += o.ntruncate; @@ -3412,12 +3461,12 @@ t.checksumErrorCount -= o.checksumErrorCount; t.nwrites -= o.nwrites; - t.ndiskWrite -= o.ndiskWrite; +// t.ndiskWrite -= o.ndiskWrite; t.maxWriteSize -= o.maxWriteSize; // @todo report max? min? t.bytesWritten -= o.bytesWritten; - t.bytesWrittenOnDisk -= o.bytesWrittenOnDisk; +// t.bytesWrittenOnDisk -= o.bytesWrittenOnDisk; t.elapsedWriteNanos -= o.elapsedWriteNanos; - t.elapsedDiskWriteNanos -= o.elapsedDiskWriteNanos; +// t.elapsedDiskWriteNanos -= o.elapsedDiskWriteNanos; t.nforce -= o.nforce; t.ntruncate -= o.ntruncate; @@ -3442,12 +3491,12 @@ checksumErrorCount = 0; nwrites = 0; - ndiskWrite = 0; +// ndiskWrite = 0; maxWriteSize = 0; bytesWritten = 0; - bytesWrittenOnDisk = 0; +// bytesWrittenOnDisk = 0; elapsedWriteNanos = 0; - elapsedDiskWriteNanos = 0; +// elapsedDiskWriteNanos = 0; nforce = 0; ntruncate = 0; @@ -3605,51 +3654,51 @@ * write */ - disk.addCounter("nwrites", new Instrument<Long>() { - public void sample() { - setValue(ndiskWrite); - } - }); +// disk.addCounter("nwrites", new Instrument<Long>() { +// public void sample() { +// setValue(ndiskWrite); +// } +// }); +// +// disk.addCounter("bytesWritten", new Instrument<Long>() { +// public void sample() { +// setValue(bytesWrittenOnDisk); +// } +// }); +// +// disk.addCounter("bytesPerWrite", new Instrument<Double>() { +// public void sample() { +// final double bytesPerDiskWrite = (ndiskWrite == 0 ? 0d +// : (bytesWrittenOnDisk / (double) ndiskWrite)); +// setValue(bytesPerDiskWrite); +// } +// }); +// +// disk.addCounter("writeSecs", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// setValue(diskWriteSecs); +// } +// }); +// +// disk.addCounter("bytesWrittenPerSec", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// final double bytesWrittenPerSec = (diskWriteSecs == 0L ? 0d +// : bytesWrittenOnDisk / diskWriteSecs); +// setValue(bytesWrittenPerSec); +// } +// }); +// +// disk.addCounter("secsPerWrite", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// final double writeLatency = (diskWriteSecs == 0 ? 0d +// : diskWriteSecs / ndiskWrite); +// setValue(writeLatency); +// } +// }); - disk.addCounter("bytesWritten", new Instrument<Long>() { - public void sample() { - setValue(bytesWrittenOnDisk); - } - }); - - disk.addCounter("bytesPerWrite", new Instrument<Double>() { - public void sample() { - final double bytesPerDiskWrite = (ndiskWrite == 0 ? 0d - : (bytesWrittenOnDisk / (double) ndiskWrite)); - setValue(bytesPerDiskWrite); - } - }); - - disk.addCounter("writeSecs", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - setValue(diskWriteSecs); - } - }); - - disk.addCounter("bytesWrittenPerSec", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - final double bytesWrittenPerSec = (diskWriteSecs == 0L ? 0d - : bytesWrittenOnDisk / diskWriteSecs); - setValue(bytesWrittenPerSec); - } - }); - - disk.addCounter("secsPerWrite", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - final double writeLatency = (diskWriteSecs == 0 ? 0d - : diskWriteSecs / ndiskWrite); - setValue(writeLatency); - } - }); - /* * other */ @@ -3752,4 +3801,12 @@ } + public void writeRawBuffer(final HAWriteMessage msg, final ByteBuffer b) + throws IOException, InterruptedException { + + m_writeCache.newWriteCache(b, true/* useChecksums */, + true/* bufferHasData */, m_reopener).flush(false/* force */); + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3900]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-05 15:44:05
|
Revision: 3900 http://bigdata.svn.sourceforge.net/bigdata/?rev=3900&view=rev Author: thompsonbry Date: 2010-11-05 15:43:58 +0000 (Fri, 05 Nov 2010) Log Message: ----------- Conditional invocation of the IAllocationContext specific methods which delegate to the RWStrategy. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Removed Paths: ------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -2612,14 +2612,39 @@ } - public long write(ByteBuffer data, final long oldAddr, IAllocationContext context) { - return ((RWStrategy)_bufferStrategy).write(data, oldAddr, context); - } + public long write(final ByteBuffer data, final long oldAddr, + final IAllocationContext context) { - public long write(ByteBuffer data, IAllocationContext context) { - return ((RWStrategy)_bufferStrategy).write(data, context); - } + assertCanWrite(); + if (_bufferStrategy instanceof RWStrategy) { + + return ((RWStrategy) _bufferStrategy).write(data, oldAddr, context); + + } else { + + return _bufferStrategy.write(data, oldAddr); + + } + + } + + public long write(final ByteBuffer data, final IAllocationContext context) { + + assertCanWrite(); + + if (_bufferStrategy instanceof RWStrategy) { + + return ((RWStrategy) _bufferStrategy).write(data, context); + + } else { + + return _bufferStrategy.write(data); + + } + + } + // Note: NOP for WORM. Used by RW for eventual recycle protocol. public void delete(final long addr) { @@ -2629,17 +2654,31 @@ } - public void delete(final long addr, IAllocationContext context) { + public void delete(final long addr, final IAllocationContext context) { assertCanWrite(); - ((RWStrategy) _bufferStrategy).delete(addr, context); + if(_bufferStrategy instanceof RWStrategy) { + + ((RWStrategy) _bufferStrategy).delete(addr, context); + + } else { + + _bufferStrategy.delete(addr); + + } } public void detachContext(final IAllocationContext context) { - ((RWStrategy) _bufferStrategy).detachContext(context); + assertCanWrite(); + + if(_bufferStrategy instanceof RWStrategy) { + + ((RWStrategy) _bufferStrategy).detachContext(context); + + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -38,7 +38,6 @@ import com.bigdata.counters.CounterSet; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.relation.locator.IResourceLocator; -import com.bigdata.rwstore.IAllocationContext; import com.bigdata.sparse.SparseRowStore; public class JournalDelegate implements IJournal { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -40,7 +40,6 @@ import com.bigdata.rawstore.AbstractRawStore; import com.bigdata.rawstore.IAddressManager; import com.bigdata.rwstore.IAllocationContext; -import com.bigdata.rwstore.JournalShadow; import com.bigdata.rwstore.RWStore; import com.bigdata.rwstore.RWStore.StoreCounters; import com.bigdata.util.ChecksumError; @@ -183,8 +182,6 @@ * {@link RWStore}. Shadow allocators may be used to isolate allocation * changes (both allocating slots and releasing slots) across different * processes. - * - * @see JournalShadow */ @Override public long write(final ByteBuffer data, final IAllocationContext context) { Deleted: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/JournalShadow.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -1,126 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rwstore; - -import java.nio.ByteBuffer; - -import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.IBufferStrategy; -import com.bigdata.journal.IJournal; -import com.bigdata.journal.JournalDelegate; -import com.bigdata.journal.RWStrategy; - -/** - * A {@link JournalShadow} wraps an Journal but provides itself as the - * allocation context to be passed through to any interested - * {@link IBufferStrategy}. This is the path by which {@link RWStore} allocators - * are provided with the context for the allocations and deletes made. - * - * @author Martyn Cutcher - */ -public class JournalShadow extends JournalDelegate implements IAllocationContext { - -// private final static AtomicLong s_idCounter = new AtomicLong(23); -// -// final private int m_id = (int) s_idCounter.incrementAndGet(); - - private JournalShadow(final AbstractJournal source) { - - super(source); - - } - - public long write(final ByteBuffer data) { - - return delegate.write(data, this); - - } - - public long write(final ByteBuffer data, final long oldAddr) { - - return delegate.write(data, oldAddr, this); - - } - - public void delete(long oldAddr) { - - delegate.delete(oldAddr, this); - - } - -// public int compareTo(Object o) { -// if (o instanceof JournalShadow) { -// JournalShadow js = (JournalShadow) o; -// return m_id - js.m_id; -// } else { -// return -1; -// } -// } - -// /** -// * TODO: should retrieve from localTransactionService or Journal -// * properties -// */ -// public long minimumReleaseTime() { -// return 0; -// } - - /** - * Release itself from the wrapped Journal, this unlocks the allocator for - * the RWStore - */ - public void detach() { - - delegate.detachContext(this); - - } - - /** - * This factory pattern creates a shadow for a RWStrategy-backed Journal to - * support protected allocations while allowing for deletion and - * re-allocation where possible. If the Journal is not backed by a - * RWStrategy, then the original Journal is returned. - * - * @param journal - * The journal to be shadowed - * - * @return The shadowed journal if necessary and otherwise the - * <i>journal</i>. - */ - public static IJournal newShadow(final AbstractJournal journal) { - - if (journal.getBufferStrategy() instanceof RWStrategy) { - - return new JournalShadow(journal); - - } else { - - return journal; - - } - - } - -} Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-04 23:20:34 UTC (rev 3899) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-05 15:43:58 UTC (rev 3900) @@ -3228,8 +3228,15 @@ ret = new ContextAllocation(this, m_freeFixed.length, null, context); - m_contexts.put(context, ret); + if (m_contexts.put(context, ret) != null) { + + throw new AssertionError(); + + } +// log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" +// + context); + } return ret; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3912]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-08 15:46:27
|
Revision: 3912 http://bigdata.svn.sourceforge.net/bigdata/?rev=3912&view=rev Author: thompsonbry Date: 2010-11-08 15:46:21 +0000 (Mon, 08 Nov 2010) Log Message: ----------- javadoc Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 15:12:25 UTC (rev 3911) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-11-08 15:46:21 UTC (rev 3912) @@ -358,7 +358,7 @@ /* * Note: getIndex() sets the listener on the BTree. That listener is - * reponsible for putting dirty indices onto the commit list. + * responsible for putting dirty indices onto the commit list. */ commitList = new ConcurrentHashMap<String,DirtyListener>(resource.length); @@ -1256,7 +1256,7 @@ * which the operation isolated by that transaction has * requested. Transaction commits are placed into a partial * order to avoid deadlocks where that ordered is determined - * by sorting the resources declared by the tx througout its + * by sorting the resources declared by the tx throughout its * life cycle and the obtaining locks on all of those * resources (in the distributed transaction service) before * the commit may start. This is very similar to how we Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java 2010-11-08 15:12:25 UTC (rev 3911) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java 2010-11-08 15:46:21 UTC (rev 3912) @@ -1373,7 +1373,7 @@ )); /* - * Create the BTree to aborb writes for the target index + * Create the BTree to absorb writes for the target index * partition. The metadata for this BTree was configured above * and is associated with a view that captures all data received * from the source index partition. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3962]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-18 17:44:59
|
Revision: 3962 http://bigdata.svn.sourceforge.net/bigdata/?rev=3962&view=rev Author: thompsonbry Date: 2010-11-18 17:44:52 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Integrated the RWStore with the AbstractTransactionService so that it will defer frees if there is an open tx. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -238,6 +238,23 @@ final JournalTransactionService abstractTransactionService = new JournalTransactionService( properties, this) { + + { + + final long lastCommitTime = Journal.this.getLastCommitTime(); + + if (lastCommitTime != 0L) { + + /* + * Notify the transaction service on startup so it can set + * the effective release time based on the last commit time + * for the store. + */ + updateReleaseTimeForBareCommit(lastCommitTime); + + } + + } protected void activateTx(final TxState state) { final IBufferStrategy bufferStrategy = Journal.this.getBufferStrategy(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -127,7 +127,7 @@ } - protected long findNextCommitTime(long commitTime) { + protected long findNextCommitTime(final long commitTime) { final ICommitRecord commitRecord = journal.getCommitRecordIndex() .findNext(commitTime); @@ -454,17 +454,20 @@ } - /** - * Always returns ZERO (0L) since history can not be released on the - * {@link Journal}. + /* @todo This is only true for the WORM. For the RWStore, the release time + * will advance normally and things can get aged out of the store. */ - @Override - public long getReleaseTime() { +// /** +// * Always returns ZERO (0L) since history can not be released on the +// * {@link Journal}. +// */ +// @Override +// public long getReleaseTime() { +// +// return 0L; +// +// } - return 0L; - - } - /** * Throws exception since distributed transactions are not used for a single * {@link Journal}. Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -1442,8 +1442,8 @@ alwaysDefer = context == null && !m_contexts.isEmpty(); if (alwaysDefer) if (log.isDebugEnabled()) - log.debug("Should defer " + physicalAddress(addr)); - if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { + log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); + if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { deferFree(addr, sze); } else { immediateFree(addr, sze); @@ -1505,8 +1505,10 @@ if (alloc == null) { throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); } - final long pa = alloc.getPhysicalAddress(addrOffset); - alloc.free(addr, sze); + final long pa = alloc.getPhysicalAddress(addrOffset); + if (log.isTraceEnabled()) + log.trace("Freeing allocation at " + addr + ", physical address: " + pa); + alloc.free(addr, sze); // must clear after free in case is a blobHdr that requires reading! // the allocation lock protects against a concurrent re-allocation // of the address before the cache has been cleared @@ -2038,16 +2040,16 @@ final JournalTransactionService transactionService = (JournalTransactionService) journal .getLocalTransactionManager().getTransactionService(); - // the previous commit point. - long latestReleasableTime = journal.getLastCommitTime(); - - if (latestReleasableTime == 0L) { - // Nothing committed. - return; - } +// // the previous commit point. +// long lastCommitTime = journal.getLastCommitTime(); +// +// if (lastCommitTime == 0L) { +// // Nothing committed. +// return; +// } - // subtract out the retention period. - latestReleasableTime -= transactionService.getMinReleaseAge(); + // the effective release time. + long latestReleasableTime = transactionService.getReleaseTime(); // add one to give this inclusive upper bound semantics. latestReleasableTime++; @@ -4219,6 +4221,8 @@ m_allocationLock.lock(); try { m_activeTxCount++; + if(log.isInfoEnabled()) + log.info("#activeTx="+m_activeTxCount); } finally { m_allocationLock.unlock(); } @@ -4228,6 +4232,8 @@ m_allocationLock.lock(); try { m_activeTxCount--; + if(log.isInfoEnabled()) + log.info("#activeTx="+m_activeTxCount); } finally { m_allocationLock.unlock(); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -1128,9 +1128,11 @@ */ protected void updateReleaseTimeForBareCommit(final long commitTime) { - if(!lock.isHeldByCurrentThread()) - throw new IllegalMonitorStateException(); - +// if(!lock.isHeldByCurrentThread()) +// throw new IllegalMonitorStateException(); + + lock.lock(); + try { synchronized (startTimeIndex) { if (this.releaseTime < (commitTime - 1) @@ -1159,6 +1161,9 @@ } } + } finally { + lock.unlock(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[3966]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-11-18 18:33:37
|
Revision: 3966 http://bigdata.svn.sourceforge.net/bigdata/?rev=3966&view=rev Author: thompsonbry Date: 2010-11-18 18:33:30 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Found a fence post in RWStore#checkDeferredFrees(). It needed to add a total of (2) to the release time. +1 since we want to read the delete blocks for the first commit record which we MAY NOT release. +1 for the inclusive upper bound semantics for the range scan of the index. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -247,8 +247,9 @@ * derived from the timestamp of the earliest running transaction MINUS the * minimum release age and is updated whenever the earliest running * transaction terminates. This value is monotonically increasing. It will - * never be GT the last commit time. It will never be negative. It MAY be - * ZERO (0L) and will be ZERO (0L) on startup. + * always be LT the last non-zero last commit time. It will never be + * negative. It MAY be ZERO (0L) and will be ZERO (0L) on startup (unless + * explicitly set by the database to the last known commit time). */ public long getReleaseTime() throws IOException; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -31,9 +31,7 @@ import java.io.IOException; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import com.bigdata.service.AbstractFederation; import com.bigdata.service.AbstractTransactionService; @@ -444,15 +442,18 @@ } - /** - * Ignored since the {@link Journal} records the last commit time - * in its root blocks. + /* @todo This is only true for the WORM. For the RWStore, the release time + * will advance normally and things can get aged out of the store. */ - public void notifyCommit(long commitTime) { - - // NOP - - } +// /** +// * Ignored since the {@link Journal} records the last commit time +// * in its root blocks. +// */ +// public void notifyCommit(long commitTime) { +// +// // NOP +// +// } /* @todo This is only true for the WORM. For the RWStore, the release time * will advance normally and things can get aged out of the store. @@ -501,42 +502,42 @@ } - /** - * Invoke a method with the {@link AbstractTransactionService}'s lock held. - * - * @param <T> - * @param callable - * @return - * @throws Exception - */ - public <T> T callWithLock(final Callable<T> callable) throws Exception { - lock.lock(); - try { - return callable.call(); - } finally { - lock.unlock(); - } - } +// /** +// * Invoke a method with the {@link AbstractTransactionService}'s lock held. +// * +// * @param <T> +// * @param callable +// * @return +// * @throws Exception +// */ +// public <T> T callWithLock(final Callable<T> callable) throws Exception { +// lock.lock(); +// try { +// return callable.call(); +// } finally { +// lock.unlock(); +// } +// } +// +// /** +// * Invoke a method with the {@link AbstractTransactionService}'s lock held. +// * +// * But throw immediate exception if try fails. +// * +// * @param <T> +// * @param callable +// * @return +// * @throws Exception +// */ +// public <T> T tryCallWithLock(final Callable<T> callable, long waitFor, TimeUnit unit) throws Exception { +// if (!lock.tryLock(waitFor,unit)) { +// throw new RuntimeException("Lock not available"); +// } +// try { +// return callable.call(); +// } finally { +// lock.unlock(); +// } +// } - /** - * Invoke a method with the {@link AbstractTransactionService}'s lock held. - * - * But throw immediate exception if try fails. - * - * @param <T> - * @param callable - * @return - * @throws Exception - */ - public <T> T tryCallWithLock(final Callable<T> callable, long waitFor, TimeUnit unit) throws Exception { - if (!lock.tryLock(waitFor,unit)) { - throw new RuntimeException("Lock not available"); - } - try { - return callable.call(); - } finally { - lock.unlock(); - } - } - } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -216,6 +216,9 @@ * (but not yet committed). * <p> * Read-only mode. + * <p> + * Unit tests looking for persistent memory leaks (e.g., all allocated + * space can be reclaimed). */ public class RWStore implements IStore { @@ -2051,10 +2054,20 @@ // the effective release time. long latestReleasableTime = transactionService.getReleaseTime(); - // add one to give this inclusive upper bound semantics. + /* + * add one because we want to read the delete blocks for all commit + * points up to and including the first commit point that we may not + * release. + */ latestReleasableTime++; /* + * add one to give this inclusive upper bound semantics to the range + * scan. + */ + latestReleasableTime++; + + /* * Free deferrals. * * Note: This adds one to the lastDeferredReleaseTime to give Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -1120,7 +1120,7 @@ * (commitTime-1) then compute and set the new releaseTime. * <p> * Note: This method was historically part of {@link #notifyCommit(long)}. - * It was moved into its own method so it can be overriden for some unit + * It was moved into its own method so it can be overridden for some unit * tests. * * @throws IllegalMonitorStateException This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
[Bigdata-commit] SF.net SVN: bigdata:[4014]
branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/ bigdata
From: <tho...@us...> - 2010-12-19 19:05:29
|
Revision: 4014 http://bigdata.svn.sourceforge.net/bigdata/?rev=4014&view=rev Author: thompsonbry Date: 2010-12-19 19:05:23 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Disabled the session optimization in RWStore per https://sourceforge.net/apps/trac/bigdata/ticket/214. Martyn is looking into this issue. Modified TemporaryStore per https://sourceforge.net/apps/trac/bigdata/ticket/215 to support incremental truth maintenance in combination with the RWStore. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java 2010-12-18 14:31:23 UTC (rev 4013) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java 2010-12-19 19:05:23 UTC (rev 4014) @@ -350,10 +350,21 @@ } - throw new UnsupportedOperationException( - "Not supported: timestamp=" - + TimestampUtility.toString(timestamp)); + /* + * FIXME The RWStore uses a read-only transaction to protect against + * recycling of the B+Tree revisions associated with the commit point + * on which it is reading. The temporary store only supports unisolated + * reads, so this is just ignoring the tx specified by the mutation rule + * for reading on the temporary store and going with the unisolated index + * anyway. See https://sourceforge.net/apps/trac/bigdata/ticket/215. + */ +// throw new UnsupportedOperationException( +// "Not supported: timestamp=" +// + TimestampUtility.toString(timestamp)); + + return getIndex(name); + } public SparseRowStore getGlobalRowStore() { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-18 14:31:23 UTC (rev 4013) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-19 19:05:23 UTC (rev 4014) @@ -1589,7 +1589,7 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - if (m_minReleaseAge == 0) { + if (false&&m_minReleaseAge == 0) { /* * The session protection is complicated by the mix of * transaction protection and isolated AllocationContexts. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |