From: <mar...@us...> - 2010-09-01 14:19:14
|
Revision: 3477 http://bigdata.svn.sourceforge.net/bigdata/?rev=3477&view=rev Author: martyncutcher Date: 2010-09-01 14:19:05 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Commit inital implementaitons of shadow allocations and CommitRecord-based deleteBlocks Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/btree/Node.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -862,12 +862,13 @@ // Clear the old key. data.childAddr[i] = NULL; - if(btree.storeCache!=null) { + if (btree.storeCache!=null) { // remove from cache. btree.storeCache.remove(oldChildAddr); } // free the oldChildAddr if the Strategy supports it - btree.store.delete(oldChildAddr); + if (true) btree.store.delete(oldChildAddr); + // System.out.println("Deleting " + oldChildAddr); // Stash reference to the new child. // childRefs[i] = btree.newRef(newChild); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -1436,7 +1436,7 @@ final WriteCache cache = acquireForWriter(); try { - debugAddrs(offset, 0, 'A'); + debugAddrs(offset, data.remaining(), 'A'); // write on the cache. if (cache.write(offset, data, chk, useChecksum)) { @@ -1982,7 +1982,7 @@ } if (addrsUsed[i] == paddr) { ret.append(addrActions[i]); - if (addrActions[i]=='W') { + if (addrActions[i]=='A') { ret.append("[" + addrLens[i] + "]"); } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -677,4 +677,8 @@ public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager) { // NOP } + + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + // NOP + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -37,6 +37,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.FileChannel; +import java.util.Iterator; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; @@ -222,6 +223,18 @@ public static transient final int ROOT_NAME2ADDR = 0; /** + * The index of the address where the root block copy from the previous + * commit is stored + */ + public static transient final int PREV_ROOTBLOCK = 1; + + /** + * The index of the address of the delete blocks associated with + * this transaction + */ + public static transient final int DELETEBLOCK = 2; + + /** * A clone of the properties used to initialize the {@link Journal}. */ final protected Properties properties; @@ -520,6 +533,73 @@ } /** + * Return the root block view associated with the commitRecord for the + * provided commit time. This requires accessing the next commit record + * since the previous root block is stored with each record. + * + * @param commitTime + * A commit time. + * + * @return The root block view -or- <code>null</code> if there is no commit + * record for that commitTime. + * + */ + public IRootBlockView getRootBlock(final long commitTime) { + + final ICommitRecord commitRecord = getCommitRecordIndex().findNext(commitTime); + + if (commitRecord == null) { + return null; + } + + final long rootBlockAddr = commitRecord.getRootAddr(PREV_ROOTBLOCK); + + if (rootBlockAddr == 0) { + return null; + } else { + ByteBuffer bb = read(rootBlockAddr); + + return new RootBlockView(true /* rb0 - WTH */, bb, checker); + } + + } + + /** + * + * @param startTime from which to begin iteration + * + * @return an iterator over the committed root blocks + */ + public Iterator<IRootBlockView> getRootBlocks(final long startTime) { + return new Iterator<IRootBlockView>() { + ICommitRecord commitRecord = getCommitRecordIndex().findNext(startTime); + + public boolean hasNext() { + return commitRecord != null; + } + + public IRootBlockView next() { + final long rootBlockAddr = commitRecord.getRootAddr(PREV_ROOTBLOCK); + + commitRecord = getCommitRecordIndex().findNext(commitRecord.getTimestamp()); + + if (rootBlockAddr == 0) { + return null; + } else { + ByteBuffer bb = read(rootBlockAddr); + + return new RootBlockView(true /* rb0 - WTH */, bb, checker); + } + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + /** * True iff the journal was opened in a read-only mode. */ private final boolean readOnly; @@ -905,6 +985,8 @@ _bufferStrategy = new RWStrategy(fileMetadata, quorum); this._rootBlock = fileMetadata.rootBlock; + + setCommitter(DELETEBLOCK, new DeleteBlockCommitter((RWStrategy) _bufferStrategy)); break; @@ -961,6 +1043,8 @@ // report event. ResourceManager.openJournal(getFile() == null ? null : getFile().toString(), size(), getBufferStrategy() .getBufferMode()); + + this._bufferStrategy.setCommitRecordIndex(_commitRecordIndex); } finally { @@ -2017,6 +2101,8 @@ // clear reference and reload from the store. _commitRecordIndex = _getCommitRecordIndex(); + + _bufferStrategy.setCommitRecordIndex(_commitRecordIndex); // clear the array of committers. _committers = new ICommitter[_committers.length]; @@ -2509,7 +2595,7 @@ public long write(final ByteBuffer data) { - assertCanRead(); + assertCanWrite(); return _bufferStrategy.write(data); @@ -2524,7 +2610,15 @@ } - // Note: NOP for WORM. Used by RW for eventual recycle protocol. + public long write(ByteBuffer data, final long oldAddr, IAllocationContext context) { + return _bufferStrategy.write(data, oldAddr, context); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return _bufferStrategy.write(data, context); + } + + // Note: NOP for WORM. Used by RW for eventual recycle protocol. public void delete(final long addr) { assertCanWrite(); @@ -2533,6 +2627,18 @@ } + public void delete(final long addr, IAllocationContext context) { + + assertCanWrite(); + + _bufferStrategy.delete(addr, context); + + } + + public void detachContext(IAllocationContext context) { + _bufferStrategy.detachContext(context); + } + final public long getRootAddr(final int index) { final ReadLock lock = _fieldReadWriteLock.readLock(); @@ -2660,6 +2766,11 @@ */ setupName2AddrBTree(getRootAddr(ROOT_NAME2ADDR)); + + /** + * Register committer to write previous root block + */ + setCommitter(PREV_ROOTBLOCK, new RootBlockCommitter(this)); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -34,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -2483,10 +2484,33 @@ } public void delete(long addr) { - // void - + delegate.delete(addr); } + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + public void delete(long addr, IAllocationContext context) { + delegate.delete(addr, context); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return delegate.write(data, context); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + return delegate.write(data, oldAddr, context); + } + + public void detachContext(IAllocationContext context) { + delegate.detachContext(context); + } + } /** @@ -2861,6 +2885,30 @@ } + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + public void delete(long addr, IAllocationContext context) { + throw new UnsupportedOperationException(); + } + + public long write(ByteBuffer data, IAllocationContext context) { + throw new UnsupportedOperationException(); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + throw new UnsupportedOperationException(); + } + + public void detachContext(IAllocationContext context) { + delegate.detachContext(context); + } + } /** Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -26,6 +26,8 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.UUID; import com.bigdata.btree.BTree; @@ -696,4 +698,34 @@ } // CommitRecordIndexTupleSerializer + public Iterator<ICommitRecord> getCommitRecords(final long fromTime, final long toTime) { + return new Iterator<ICommitRecord>() { + ICommitRecord m_next = findNext(fromTime); + + public boolean hasNext() { + return m_next != null; + } + + public ICommitRecord next() { + if (m_next == null) { + throw new NoSuchElementException(); + } + + ICommitRecord ret = m_next; + m_next = findNext(ret.getTimestamp()); + + if (m_next != null && m_next.getTimestamp() > toTime) { + m_next = null; + } + + return ret; + } + + public void remove() { + throw new RuntimeException("Invalid Operation"); + } + + }; + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -2559,5 +2559,9 @@ public void setNextOffset(long lastOffset) { // void for standard Disk strategy } + + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + // NOP + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -1245,8 +1245,8 @@ } - if (log.isDebugEnabled()) - log.debug("Writing ROOTBLOCK with commitCounter: " + rootBlock.getCommitCounter() + if (log.isTraceEnabled()) + log.trace("Writing ROOTBLOCK with commitCounter: " + rootBlock.getCommitCounter() + ", commitRecordIndexAddr: " + rootBlock.getCommitRecordIndexAddr() + ", commitRecordAddr: " + rootBlock.getCommitRecordAddr()); } Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -0,0 +1,41 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.journal; + +/** + * An IAllocationContext defines a shadow environment which may be + * associated with allocations made during a transaction. + * + * @author Martyn Cutcher + * + */ +public interface IAllocationContext extends Comparable { + + /** + * @return the minimum release time for any freed allocations + */ + long minimumReleaseTime(); + +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAllocationContext.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IAtomicStore.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -27,6 +27,8 @@ package com.bigdata.journal; +import java.util.Iterator; + import com.bigdata.rawstore.IRawStore; /** @@ -111,4 +113,24 @@ */ public ICommitRecord getCommitRecord(long timestamp); + /** + * Return the root block view associated with the commitRecord for the + * provided commit time. This requires accessing the next commit record + * since it is the previous root block that is referenced from each record. + * + * @param commitTime + * A commit time. + * + * @return The root block view -or- <code>null</code> if there is no commit + * record for that commitTime. + */ + public IRootBlockView getRootBlock(final long commitTime); + + /** + * + * @param startTime from which to begin iteration + * + * @return an iterator over the committed root blocks + */ + public Iterator<IRootBlockView> getRootBlocks(final long startTime); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -270,4 +270,17 @@ */ public void setTransactionManager(AbstractLocalTransactionManager localTransactionManager); + + /** + * Needed to enable transaction support for standalone buffer strategies. + * + * The WORMStrategy does not need this since no data is ever deleted, but + * the RWStrategy must manage deletions and needs access to the historical + * commitRecords which reference the blocks of deferred deleted addresses. + * + * @param commitRecordIndex + * The CommitRecordIndex for the owning Journal + */ + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex); + } Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -0,0 +1,245 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.journal; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import com.bigdata.bfs.BigdataFileSystem; +import com.bigdata.btree.BTree; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.IndexMetadata; +import com.bigdata.counters.CounterSet; +import com.bigdata.mdi.IResourceMetadata; +import com.bigdata.relation.locator.IResourceLocator; +import com.bigdata.sparse.SparseRowStore; + +public class JournalDelegate implements IJournal { + final IJournal delegate; + + public JournalDelegate(final IJournal source) { + this.delegate = source; + } + + public Properties getProperties() { + return delegate.getProperties(); + } + + public void shutdown() { + delegate.shutdown(); + } + + public void shutdownNow() { + delegate.shutdownNow(); + } + + public void abort() { + delegate.abort(); + } + + public long commit() { + return delegate.commit(); + } + + public ICommitRecord getCommitRecord(long timestamp) { + return delegate.getCommitRecord(timestamp); + } + + public long getRootAddr(int index) { + return delegate.getRootAddr(index); + } + + public IRootBlockView getRootBlock(long commitTime) { + return delegate.getRootBlock(commitTime); + } + + public IRootBlockView getRootBlockView() { + return delegate.getRootBlockView(); + } + + public Iterator<IRootBlockView> getRootBlocks(long startTime) { + return delegate.getRootBlocks(startTime); + } + + public void setCommitter(int index, ICommitter committer) { + delegate.setCommitter(index, committer); + } + + public void close() { + delegate.close(); + } + + public void delete(long addr) { + delegate.delete(addr); + } + + public void deleteResources() { + delegate.deleteResources(); + } + + public void destroy() { + delegate.destroy(); + } + + public void force(boolean metadata) { + delegate.force(metadata); + } + + public CounterSet getCounters() { + return delegate.getCounters(); + } + + public File getFile() { + return delegate.getFile(); + } + + public IResourceMetadata getResourceMetadata() { + return delegate.getResourceMetadata(); + } + + public UUID getUUID() { + return delegate.getUUID(); + } + + public boolean isFullyBuffered() { + return delegate.isFullyBuffered(); + } + + public boolean isOpen() { + return delegate.isOpen(); + } + + public boolean isReadOnly() { + return delegate.isOpen(); + } + + public boolean isStable() { + return delegate.isStable(); + } + + public ByteBuffer read(long addr) { + return delegate.read(addr); + } + + public long size() { + return delegate.size(); + } + + public long write(ByteBuffer data) { + return delegate.write(data); + } + + public long write(ByteBuffer data, long oldAddr) { + return delegate.write(data, oldAddr); + } + + public int getByteCount(long addr) { + return delegate.getByteCount(addr); + } + + public long getOffset(long addr) { + return delegate.getOffset(addr); + } + + public long toAddr(int nbytes, long offset) { + return delegate.toAddr(nbytes, offset); + } + + public String toString(long addr) { + return delegate.toString(addr); + } + + public IIndex getIndex(String name) { + return delegate.getIndex(name); + } + + public IIndex registerIndex(String name, BTree btree) { + return delegate.registerIndex(name, btree); + } + + public IIndex registerIndex(String name, IndexMetadata indexMetadata) { + return delegate.registerIndex(name, indexMetadata); + } + + public void dropIndex(String name) { + delegate.dropIndex(name); + } + + public void registerIndex(IndexMetadata indexMetadata) { + delegate.registerIndex(indexMetadata); + } + + public ExecutorService getExecutorService() { + return delegate.getExecutorService(); + } + + public BigdataFileSystem getGlobalFileSystem() { + return delegate.getGlobalFileSystem(); + } + + public SparseRowStore getGlobalRowStore() { + return delegate.getGlobalRowStore(); + } + + public IIndex getIndex(String name, long timestamp) { + return delegate.getIndex(name, timestamp); + } + + public long getLastCommitTime() { + return delegate.getLastCommitTime(); + } + + public IResourceLocator getResourceLocator() { + return delegate.getResourceLocator(); + } + + public IResourceLockService getResourceLockService() { + return delegate.getResourceLockService(); + } + + public TemporaryStore getTempStore() { + return delegate.getTempStore(); + } + + public void delete(long addr, IAllocationContext context) { + delegate.delete(addr, context); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return delegate.write(data, context); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + return delegate.write(data, oldAddr, context); + } + + public void detachContext(IAllocationContext context) { + delegate.detachContext(context); + } +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalDelegate.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -0,0 +1,85 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.journal; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A JournalShadow wraps a Journal as a JournalDelegate but provides itself + * as the allocation context to be passed through to any interested + * BufferStrategy. + * + * This is the path by which RWStore allocators are provided the context for + * the allocations and deletes made + * + * @author Martyn Cutcher + * + */ +public class JournalShadow extends JournalDelegate implements IAllocationContext { + static AtomicLong s_idCounter = new AtomicLong(23); + int m_id = (int) s_idCounter.incrementAndGet(); + + public JournalShadow(IJournal source) { + super(source); + } + + public long write(ByteBuffer data) { + return delegate.write(data, this); + } + + public long write(ByteBuffer data, long oldAddr) { + return delegate.write(data, oldAddr, this); + } + + public void delete(long oldAddr) { + delegate.delete(oldAddr, this); + } + + public int compareTo(Object o) { + if (o instanceof JournalShadow) { + JournalShadow js = (JournalShadow) o; + return m_id - js.m_id; + } else { + return -1; + } + } + + /** + * TODO: should retrieve from localTransactionService or Journal + * properties + */ + public long minimumReleaseTime() { + return 0; + } + + /** + * Release itself from the wrapped Journal, this unlocks the allocator for + * the RWStore + */ + public void detach() { + delegate.detachContext(this); + } +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalShadow.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import com.bigdata.service.AbstractFederation; import com.bigdata.service.AbstractTransactionService; @@ -514,4 +515,25 @@ } } + /** + * Invoke a method with the {@link AbstractTransactionService}'s lock held. + * + * But throw immediate exception if try fails. + * + * @param <T> + * @param callable + * @return + * @throws Exception + */ + public <T> T tryCallWithLock(final Callable<T> callable, long waitFor, TimeUnit unit) throws Exception { + if (!lock.tryLock(waitFor,unit)) { + throw new RuntimeException("Lock not available"); + } + try { + return callable.call(); + } finally { + lock.unlock(); + } + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -81,6 +81,8 @@ final private FileMetadataView m_fmv = new FileMetadataView(); + private volatile boolean m_open = false; + private volatile IRootBlockView m_rb; private volatile IRootBlockView m_rb0; private volatile IRootBlockView m_rb1; @@ -110,6 +112,7 @@ m_rb = fileMetadata.rootBlock; m_store = new RWStore(m_fmv, false, quorum); // not read-only for now + m_open = true; m_rb0 = copyRootBlock(true); m_rb1 = copyRootBlock(false); @@ -233,6 +236,10 @@ } public long write(ByteBuffer data) { + return write(data, null); + } + + public long write(ByteBuffer data, IAllocationContext context) { checkReopen(); if (data == null) { @@ -246,7 +253,7 @@ } try { - long rwaddr = m_store.alloc(data.array(), nbytes); + long rwaddr = m_store.alloc(data.array(), nbytes, context); data.position(nbytes); // update position to end of buffer long retaddr = encodeAddr(rwaddr, nbytes); @@ -296,30 +303,27 @@ return (int) (addr & 0xFFFFFFFF); } + public void delete(long addr) { + if (true) delete(addr, null); + } + /** * Must check whether there are existing transactions which may access * this data, and if not free immediately, otherwise defer. */ - public void delete(long addr) { - final JournalTransactionService service = (JournalTransactionService) (localTransactionManager == null ? null - : localTransactionManager.getTransactionService()); + public void delete(long addr, IAllocationContext context) { final int rwaddr = decodeAddr(addr); final int sze = decodeSize(addr); - // FIXME: need to decide on correct way to handle transaction oriented - // allocations - if (true || service == null) { - m_store.free(rwaddr, sze); - } else { - /* - * May well be better to always defer and then free in batch, - * but for now need to confirm transaction logic - */ - m_store.deferFree(rwaddr, sze, m_rb.getLastCommitTime()); - } + m_store.free(rwaddr, sze, context); } + + public void detachContext(IAllocationContext context) { + m_store.detachContext(context); + } + public static class RWAddressManager implements IAddressManager { public int getByteCount(long addr) { @@ -428,8 +432,8 @@ try { m_store.checkRootBlock(rootBlock); - if (log.isInfoEnabled()) { - log.info("Writing new rootblock with commitCounter: " + if (log.isTraceEnabled()) { + log.trace("Writing new rootblock with commitCounter: " + rootBlock.getCommitCounter() + ", commitRecordAddr: " + rootBlock.getCommitRecordAddr() + ", commitRecordIndexAddr: " + rootBlock.getCommitRecordIndexAddr()); @@ -461,6 +465,8 @@ throw new IllegalStateException(); } try { + m_open = false; + m_store.close(); m_fileMetadata.raf.close(); m_fileMetadata.raf = null; @@ -563,6 +569,7 @@ m_fileMetadata.raf = new RandomAccessFile(m_fileMetadata.file, m_fileMetadata.fileMode); m_store = new RWStore(m_fmv, false, m_environment); // never read-only for now m_needsReopen = false; + m_open = true; } catch (Throwable t) { t.printStackTrace(); @@ -593,7 +600,8 @@ } public boolean isOpen() { - return m_fileMetadata.raf != null && m_fileMetadata.raf.getChannel().isOpen(); + // return m_fileMetadata.raf != null && m_fileMetadata.raf.getChannel().isOpen(); + return m_open; } public boolean isReadOnly() { @@ -714,4 +722,25 @@ m_store.setTransactionService((JournalTransactionService) localTransactionManager.getTransactionService()); } + public long getPhysicalAddress(long addr) { + int rwaddr = decodeAddr(addr); + + return m_store.physicalAddress(rwaddr); + } + + /** + * Saves the current list of delete blocks, returning the address allocated. + * This can be used later to retrieve the addresses of allocations to be + * freed. + * + * @return the address of the delete blocks, or zero if none + */ + public long saveDeleteBlocks() { + return m_store.saveDeferrals(); + } + + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + m_store.setCommitRecordIndex(commitRecordIndex); + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -2240,4 +2240,8 @@ } + public void setCommitRecordIndex(CommitRecordIndex commitRecordIndex) { + // NOP + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/AbstractRawStore.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import com.bigdata.LRUNexus; +import com.bigdata.journal.IAllocationContext; /** * Abstract base class for {@link IRawStore} implementations. This class uses a @@ -76,4 +77,23 @@ public void delete(long addr) { // NOP. } + + public void delete(long addr, IAllocationContext context) { + delete(addr); + } + + public long write(ByteBuffer data, IAllocationContext context) { + return write(data); + } + + public long write(ByteBuffer data, long oldAddr, IAllocationContext context) { + return write(data, oldAddr); + } + + /** + * The default implementation is a NOP. + */ + public void detachContext(IAllocationContext context) { + // NOP + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rawstore/IRawStore.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -37,6 +37,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.io.IByteArrayBuffer; import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.IAllocationContext; import com.bigdata.mdi.IResourceMetadata; /** @@ -124,6 +125,7 @@ public long write(ByteBuffer data); /** + * Write the data (unisolated). * * @param data * The data. The bytes from the current @@ -133,6 +135,43 @@ * {@link ByteBuffer#limit()} . The caller may subsequently * modify the contents of the buffer without changing the state * of the store (i.e., the data are copied into the store). + * + * @param context defines teh shadow AllocationContext from which this call + * was made + * + * @return A long integer formed that encodes both the offset from which the + * data may be read and the #of bytes to be read. See + * {@link IAddressManager}. + * + * @throws IllegalArgumentException + * if <i>data</i> is <code>null</code>. + * @throws IllegalArgumentException + * if <i>data</i> has zero bytes {@link ByteBuffer#remaining()}. + * @throws IllegalStateException + * if the store is not open. + * @throws IllegalStateException + * if the store does not allow writes. + * + * @todo define exception if the maximum extent would be exceeded. + * + * @todo the addresses need to reflect the ascending offset at which the + * data are written, at least for a class of append only store. some + * stores, such as the Journal, also have an offset from the start of + * the file to the start of the data region (in the case of the + * Journal it is used to hold the root blocks). + */ + public long write(ByteBuffer data, IAllocationContext context); + + /** + * + * @param data + * The data. The bytes from the current + * {@link ByteBuffer#position()} to the + * {@link ByteBuffer#limit()} will be written and the + * {@link ByteBuffer#position()} will be advanced to the + * {@link ByteBuffer#limit()} . The caller may subsequently + * modify the contents of the buffer without changing the state + * of the store (i.e., the data are copied into the store). * @param oldAddr as returned from a previous write of the same object, or zero if a new write * * @return A long integer formed that encodes both the offset from which the @@ -141,6 +180,25 @@ */ public long write(ByteBuffer data, long oldAddr); + /** + * + * @param data + * The data. The bytes from the current + * {@link ByteBuffer#position()} to the + * {@link ByteBuffer#limit()} will be written and the + * {@link ByteBuffer#position()} will be advanced to the + * {@link ByteBuffer#limit()} . The caller may subsequently + * modify the contents of the buffer without changing the state + * of the store (i.e., the data are copied into the store). + * @param oldAddr as returned from a previous write of the same object, or zero if a new write + * @param context defines the shadow AllocationContext from which this call is made + * + * @return A long integer formed that encodes both the offset from which the + * data may be read and the #of bytes to be read. See + * {@link IAddressManager}. + */ + public long write(ByteBuffer data, long oldAddr, IAllocationContext context); + /** * Delete the data (unisolated). * <p> @@ -168,6 +226,48 @@ public void delete(long addr); /** + * Delete the data (unisolated). + * <p> + * After this operation subsequent reads on the address MAY fail and the + * caller MUST NOT depend on the ability to read at that address. + * + * @param addr + * A long integer formed using {@link Addr} that encodes both the + * offset at which the data was written and the #of bytes that + * were written. + * + * @param context + * Defines the shadow AllocationContext from which this call is + * made. For RWStore this can be used to immediately free the + * allocation if it can be determined to have orignally have + * been requested from the same context. + * + * @exception IllegalArgumentException + * If the address is known to be invalid (never written or + * deleted). Note that the address 0L is always invalid. + * + * It is only applicable in the + * context of a garbage collection strategy. With an append only + * store and with eviction of btrees into index segments there + * is no reason to delete anything on the store - and nothing to + * keep track of the delete. + * + * However, with a Read-Write store it is a requirement, and a void + * implementation is provided for other stores. + */ + public void delete(long addr, IAllocationContext context); + + /** + * + * @param context + * Defines the shadow AllocationContext that may have been used + * to allocate or delete storage. The RWStore assigns + * Allocation areas to specific contexts and these must be + * released for use by others. + */ + public void detachContext(IAllocationContext context); + + /** * Read the data (unisolated). * * @param addr Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -43,8 +43,10 @@ import com.bigdata.journal.ConcurrencyManager; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.IIndexStore; +import com.bigdata.journal.IJournal; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; +import com.bigdata.journal.JournalShadow; import com.bigdata.journal.TimestampUtility; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.accesspath.ChunkConsumerIterator; @@ -503,6 +505,11 @@ * the mutation task will read. */ tx = jnl.newTx(lastCommitTime); + + /* + * Create the shadow journal to define the allocation context + */ + indexManager = new JournalShadow(jnl); // the timestamp that we will read on for this step. joinNexusFactory.setReadTimestamp(TimestampUtility Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -20,13 +20,14 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ + */ package com.bigdata.rwstore; import java.util.ArrayList; import com.bigdata.io.writecache.WriteCacheService; +import com.bigdata.journal.IAllocationContext; /** * Bit maps for an allocator. The allocator is a bit map managed as int[]s. @@ -58,6 +59,11 @@ */ int m_commit[]; /** + * If used as a shadow allocator, then the _commit is saved to m_saveCommit + * and m_transients is copied to m_commit. + */ + int m_saveCommit[]; + /** * Just the newly allocated bits. This will be copied onto {@link #m_commit} * when the current native transaction commits. */ @@ -73,113 +79,152 @@ */ private final RWWriteCacheService m_writeCache; - AllocBlock(final int addrIsUnused, final int bitSize, final RWWriteCacheService cache) { - m_writeCache = cache; - m_ints = bitSize; - m_commit = new int[bitSize]; - m_bits = new int[bitSize]; - m_transients = new int[bitSize]; - } + AllocBlock(final int addrIsUnused, final int bitSize, final RWWriteCacheService cache) { + m_writeCache = cache; + m_ints = bitSize; + m_commit = new int[bitSize]; + m_bits = new int[bitSize]; + m_transients = new int[bitSize]; + } - public boolean verify(final int addr, final int size) { - if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { - return false; - } + public boolean verify(final int addr, final int size) { + if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { + return false; + } // Now check to see if it allocated - final int bit = (addr - m_addr) / size; + final int bit = (addr - m_addr) / size; - return RWStore.tstBit(m_bits, bit); - } + return RWStore.tstBit(m_bits, bit); + } - public boolean addressInRange(final int addr, final int size) { - return (addr >= m_addr && addr <= (m_addr + (size * 32 * m_ints))); - } - - public boolean free(final int addr, final int size) { - if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { - return false; - } + public boolean addressInRange(final int addr, final int size) { + return (addr >= m_addr && addr <= (m_addr + (size * 32 * m_ints))); + } - freeBit((addr - m_addr) / size, addr); + public boolean free(final int addr, final int size) { + if (addr < m_addr || addr >= (m_addr + (size * 32 * m_ints))) { + return false; + } - return true; - } + freeBit((addr - m_addr) / size); - public boolean freeBit(final int bit, final long addr) { - // Allocation optimization - if bit NOT set in committed memory then clear - // the transient bit to permit reallocation within this transaction. - // - // Note that with buffered IO there is also an opportunity to avoid output to - // the file by removing any pending write to the now freed address. On large - // transaction scopes this may be significant. - RWStore.clrBit(m_bits, bit); - - if (!RWStore.tstBit(m_commit, bit)) { - // Should not be cleared here! - // m_writeCache.clearWrite(addr); + return true; + } - RWStore.clrBit(m_transients, bit); - - return true; - } else { - return false; - } - } + public boolean freeBit(final int bit) { + if (!RWStore.tstBit(m_bits, bit)) { + throw new IllegalArgumentException("Freeing bit not set"); + } + + // Allocation optimization - if bit NOT set in committed memory then + // clear + // the transient bit to permit reallocation within this transaction. + // + // Note that with buffered IO there is also an opportunity to avoid + // output to + // the file by removing any pending write to the now freed address. On + // large + // transaction scopes this may be significant. + RWStore.clrBit(m_bits, bit); - public int alloc(final int size) { - if (size < 0) { - throw new Error("Storage allocation error : negative size passed"); - } + if (!RWStore.tstBit(m_commit, bit)) { + RWStore.clrBit(m_transients, bit); - final int bit = RWStore.fndBit(m_transients, m_ints); + return true; + } else { + return false; + } + } - if (bit != -1) { - RWStore.setBit(m_bits, bit); - RWStore.setBit(m_transients, bit); + /** + * The shadow, if non-null defines the context for this request. + * + * If an existing shadow is registered, then the allocation fails + * immediately. + * + * If no existing shadow is registered, and a new allocation can be made + * then this AllocBlock is registered with the shadow. + * + * Note that when shadows are used, an allocator on a free list may not have + * allocations available for all contexts, so the assumption that presence + * on the free list implies availability is not assertable. + */ - return bit; - } else { - return -1; - } - } + public int alloc(final int size) { + if (size < 0) { + throw new Error("Storage allocation error : negative size passed"); + } - public boolean hasFree() { - for (int i = 0; i < m_ints; i++) { - if (m_bits[i] != 0xFFFFFFFF) { - return true; - } - } + final int bit = RWStore.fndBit(m_transients, m_ints); - return false; - } + if (bit != -1) { + RWStore.setBit(m_bits, bit); + RWStore.setBit(m_transients, bit); + return bit; + } else { + return -1; + } + } + + public boolean hasFree() { + for (int i = 0; i < m_ints; i++) { + if (m_bits[i] != 0xFFFFFFFF) { + return true; + } + } + + return false; + } + public int getAllocBits() { - int total = m_ints * 32; - int allocBits = 0; - for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { - allocBits++; - } - } - - return allocBits; + int total = m_ints * 32; + int allocBits = 0; + for (int i = 0; i < total; i++) { + if (RWStore.tstBit(m_bits, i)) { + allocBits++; + } + } + + return allocBits; } - public String getStats() { - final int total = m_ints * 32; - final int allocBits = getAllocBits(); + public String getStats() { + final int total = m_ints * 32; + final int allocBits = getAllocBits(); - return "Addr : " + m_addr + " [" + allocBits + "::" + total + "]"; - } + return "Addr : " + m_addr + " [" + allocBits + "::" + total + "]"; + } - public void addAddresses(final ArrayList addrs, final int rootAddr) { - final int total = m_ints * 32; - - for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { - addrs.add(new Integer(rootAddr - i)); - } - } - } + public void addAddresses(final ArrayList addrs, final int rootAddr) { + final int total = m_ints * 32; + + for (int i = 0; i < total; i++) { + if (RWStore.tstBit(m_bits, i)) { + addrs.add(new Integer(rootAddr - i)); + } + } + } + + /** + * Store m_commit bits in m_saveCommit then duplicate transients to m_commit. + * + * This ensures, that while shadowed, the allocator will not re-use storage + * that was allocated prior to the shadow creation. + */ + public void shadow() { + m_saveCommit = m_commit; + m_commit = m_transients.clone(); + } + + /** + * The transient bits will have been added to correctly, we now just need to + * restore the commit bits from the m_saveCommit, to allow re-allocation + * of non-committed storage. + */ + public void deshadow() { + m_commit = m_saveCommit; + m_saveCommit = null; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/Allocator.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; +import com.bigdata.journal.IAllocationContext; + public interface Allocator extends Comparable { public int getBlockSize(); public void setIndex(int index); @@ -35,7 +37,7 @@ public long getStartAddr(); public boolean addressInRange(int addr); public boolean free(int addr, int size); - public int alloc(RWStore store, int size); + public int alloc(RWStore store, int size, IAllocationContext context); public int getDiskAddr(); public void setDiskAddr(int addr); public long getPhysicalAddress(int offset); @@ -46,10 +48,10 @@ public boolean hasFree(); public void setFreeList(ArrayList list); public String getStats(AtomicLong counter); - public void preserveSessionData(); public void addAddresses(ArrayList addrs); public int getRawStartAddr(); public int getIndex(); public void appendShortStats(StringBuffer str); + public boolean canImmediatelyFree(int addr, int size, IAllocationContext context); } \ No newline at end of file Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/BlobAllocator.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; +import com.bigdata.journal.IAllocationContext; import com.bigdata.util.ChecksumUtility; /** @@ -35,6 +36,8 @@ public BlobAllocator(RWStore store, int sortAddr) { m_store = store; m_sortAddr = sortAddr; + + System.out.println("New BlobAllocator"); } public void addAddresses(ArrayList addrs) { @@ -46,7 +49,7 @@ return false; } - public int alloc(RWStore store, int size) { + public int alloc(RWStore store, int size, IAllocationContext context) { assert size > m_store.m_maxFixedAlloc; return 0; @@ -92,7 +95,41 @@ return false; } + + public int getFirstFixedForBlob(int addr, int sze) { + if (sze < m_store.m_maxFixedAlloc) + throw new IllegalArgumentException("Unexpected address size"); + int alloc = m_store.m_maxFixedAlloc-4; + int blcks = (alloc - 1 + sze)/alloc; + + int hdr_idx = (-addr) & RWStore.OFFSET_BITS_MASK; + if (hdr_idx > m_hdrs.length) + throw new IllegalArgumentException("free BlobAllocation problem, hdr offset: " + hdr_idx + ", avail:" + m_hdrs.length); + + int hdr_addr = m_hdrs[hdr_idx]; + + if (hdr_addr == 0) { + throw new IllegalArgumentException("getFirstFixedForBlob called with unallocated address"); + } + + // read in header block, then free each reference + byte[] hdr = new byte[(blcks+1) * 4 + 4]; // add space for checksum + m_store.getData(hdr_addr, hdr); + + try { + DataInputStream instr = new DataInputStream( + new ByteArrayInputStream(hdr, 0, hdr.length-4) ); + int nallocs = instr.readInt(); + int faddr = instr.readInt(); + + return faddr; + + } catch (IOException ioe) { + throw new RuntimeException("Unable to retrieve first fixed address", ioe); + } + } + public int getBlockSize() { // Not relevant for Blobs return 0; @@ -269,4 +306,22 @@ return m_hdrs[offset] != 0; } + /** + * This is okay as a NOP. The true allocation is managed by the + * FixedAllocators. + */ + public void detachContext(IAllocationContext context) { + // NOP + } + + /** + * Since the real allocation is in the FixedAllocators, this should delegate + * to the first address, in which case + */ + public boolean canImmediatelyFree(int addr, int size, IAllocationContext context) { + int faddr = this.getFirstFixedForBlob(addr, size); + + return m_store.getBlockByAddress(faddr).canImmediatelyFree(faddr, 0, context); + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-01 13:35:08 UTC (rev 3476) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-09-01 14:19:05 UTC (rev 3477) @@ -30,6 +30,7 @@ import org.apache.log4j.Logger; +import com.bigdata.journal.IAllocationContext; import com.bigdata.util.ChecksumUtility; /** @@ -47,8 +48,6 @@ private int m_diskAddr; int m_index; - protected boolean m_preserveSession = false; - public void setIndex(int index) { AllocBlock fb = (AllocBlock) m_allocBlocks.get(0); if (log.isDebugEnabled()) @@ -57,10 +56,6 @@ m_index = index; } - public void preserveSessionData() { - m_preserveSession = true; - } - public long getStartAddr() { return RWStore.convertAddr(m_startAddr); } @@ -125,9 +120,31 @@ } } + volatile private IAllocationContext m_context; + public void setAllocationContext(IAllocationContext context) { + if (context == null && m_context != null) { + // restore commit bits in AllocBlocks + for (AllocBlock allocBlock : m_allocBlocks) { + allocBlock.deshadow(); + } + } else if (context != null & m_context == null) { + // restore commit bits in AllocBlocks + for (AllocBlock allocBlock : m_allocBlocks) { + allocBlock.shadow(); + } + } + m_context = context; + } + + /** + * write called on commit, so this is the point when "transient frees" - the + * freeing of previously committed memory can be made available since we + * are creating a new commit point - the condition being that m_freeBits + * was zero and m_freeTransients not. + */ public byte[] write() { try { - final AllocBlock fb = (AllocBlock) m_allocBlocks.get(0); + final AllocBlock fb = m_allocBlocks.get(0); if (log.isDebugEnabled()) log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_bits[0]); final byte[] buf = new byte[1024]; @@ -144,17 +161,29 @@ str.writeInt(block.m_bits[i]); } - if (!m_preserveSession) { - block.m_transients = (int[]) block.m_bits.clone(); + if (!m_store.isSessionPreserved()) { + block.m_transients = block.m_bits.clone(); } - block.m_commit = (int[]) block.m_bits.clone(); + /** + * If this allocator is shadowed then copy the new + * committed state to m_saveCommit + */ + if (m_context != null) { + assert block.m_saveCommit != null; + + block.m_saveCommit = block.m_bits.clone(); + } else if (m_store.isSessionPreserved()) { + block.m_commit = block.m_transients.clone(); + } else { + block.m_commit = block.m_bits.clone(); + } } // add checksum final int chk = ChecksumUtility.getCHK().checksum(buf, str.size()); str.writeInt(chk); - if (!m_preserveSession) { + if (!m_store.isSessionPreserved()) { m_freeBits += m_freeTransients; // Handle re-addition to free list once transient frees are @@ -234,6 +263,8 @@ private final ArrayList<AllocBlock> m_allocBlocks; + private RWStore m_store; + /** * Calculating the number of ints (m_bitSize) cannot rely on a power of 2. Previously this * assumption was sufficient to guarantee a rounding on to an 64k boundary. However, now @@ -248,8 +279,9 @@ * @param preserveSessionData * @param cache */ - FixedAllocator(final int size, final boolean preserveSessionData, final RWWriteCacheService cache) { + FixedAllocator(final RWStore store, final int size, final RWWriteCacheService cache) { m_diskAddr = 0; + m_store = store; m_size = size; @@ -289,8 +321,6 @@ m_freeTransients = 0; m_freeBits = 32 * m_bitSize * numBlocks; - - m_preserveSession = preserveSessionData; } public String getStats(final AtomicLong counter) { @@ -351,7 +381,7 @@ final int block = offset/nbits; if (((AllocBlock) m_allocBlocks.get(block)) - .freeBit(offset % nbits, getPhysicalAddress(offset + 3))) { // bit adjust + .freeBit(offset % n... [truncated message content] |