Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv12401/src/java/com/bigdata/journal Modified Files: Tx.java ITx.java IRootBlockView.java RootBlockView.java TransactionServer.java IAtomicStore.java TemporaryStore.java IJournal.java Name2Addr.java FileMetadata.java Journal.java Added Files: CommitRecord.java LocalTimestampService.java CommitRecordIndex.java ICommitRecord.java ITimestampService.java IIndexManager.java CommitRecordSerializer.java Log Message: Working through transactional isolation. --- NEW FILE: CommitRecordIndex.java --- package com.bigdata.journal; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.CognitiveWeb.extser.LongPacker; import com.bigdata.objndx.BTree; import com.bigdata.objndx.BTreeMetadata; import com.bigdata.objndx.IValueSerializer; import com.bigdata.objndx.KeyBuilder; import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.IRawStore; /** * BTree mapping commit times to {@link ICommitRecord}s. The keys are the long * integers. The values are {@link Entry} objects recording the commit time of * the index and the {@link Addr address} of the {@link ICommitRecord} for that * commit time. */ public class CommitRecordIndex extends BTree { /** * @todo refactor to share with the {@link Journal}? */ private KeyBuilder keyBuilder = new KeyBuilder(); // /** // * Cache of added/retrieved commit records. // * // * @todo This only works for exact timestamp matches so the cache might not // * be very useful here. Also, this must be a weak value cache or it will // * leak memory. // */ // private Map<Long, ICommitRecord> cache = new HashMap<Long, ICommitRecord>(); public CommitRecordIndex(IRawStore store) { super(store, DEFAULT_BRANCHING_FACTOR, ValueSerializer.INSTANCE); } /** * Load from the store. * * @param store * The backing store. * @param metadataId * The metadata record for the index. */ public CommitRecordIndex(IRawStore store, BTreeMetadata metadata) { super(store, metadata); } /** * Encodes the commit time into a key. * * @param commitTime * The commit time. * * @return The corresponding key. */ protected byte[] getKey(long commitTime) { return keyBuilder.reset().append(commitTime).getKey(); } /** * Existence test for a commit record with the specified commit timestamp. * * @param commitTime * The commit timestamp. * @return true iff such an {@link ICommitRecord} exists in the index with * that commit timestamp. */ public boolean hasTimestamp(long commitTime) { return super.contains(getKey(commitTime)); } /** * Return the {@link ICommitRecord} with the given timestamp (exact match). * This method tests a cache of the named btrees and will return the same * instance if the index is found in the cache. * * @param commitTime * The commit time. * * @return The {@link ICommitRecord} index or <code>null</code> iff there * is no {@link ICommitTimestamp} for that commit time. */ public ICommitRecord get(long commitTime) { ICommitRecord commitRecord; // commitRecord = cache.get(commitTime); // // if (commitRecord != null) // return commitRecord; final Entry entry = (Entry) super.lookup(getKey(commitTime)); if (entry == null) { return null; } /* * re-load the commit record from the store. */ commitRecord = loadCommitRecord(store,entry.addr); // /* // * save commit time -> commit record mapping in transient cache (this // * only works for for exact matches on the timestamp so the cache may // * not be very useful here). // */ // cache.put(commitRecord.getTimestamp(),commitRecord); // return btree. return commitRecord; } /** * Return the {@link ICommitRecord} having the largest timestamp that is * strictly less than the given timestamp. * * @param timestamp * The given timestamp. * * @return The commit record -or- <code>null</code> iff there are no * {@link ICommitRecord}s in the that satisify the probe. * * @see #get(long) */ public ICommitRecord find(long timestamp) { final int index = findIndexOf(timestamp); if(index == -1) { // No match. return null; } // return the matched record. Entry entry = (Entry) super.valueAt( index ); return loadCommitRecord(store,entry.addr); } /** * Find the index of the {@link ICommitRecord} having the largest timestamp * that is less than or equal to the given timestamp. * * @return The index of the {@link ICommitRecord} having the largest * timestamp that is less than or equal to the given timestamp -or- * <code>-1</code> iff there are no {@link ICommitRecord}s * defined. */ public int findIndexOf(long timestamp) { int pos = super.indexOf(getKey(timestamp)); if (pos < 0) { /* * the key lies between the entries in the index, or possible before * the first entry in the index. [pos] represents the insert * position. we convert it to an entry index and subtract one to get * the index of the first commit record less than the given * timestamp. */ pos = -(pos+1); if(pos == 0) { // No entry is less than or equal to this timestamp. return -1; } pos--; return pos; } else { /* * exact hit on an entry. */ return pos; } } /** * Re-load a commit record from the store. * * @param store * The store. * @param addr * The address of the {@link ICommitRecord}. * * @return The {@link ICommitRecord} loaded from the specified address. */ protected ICommitRecord loadCommitRecord(IRawStore store, long addr) { return CommitRecordSerializer.INSTANCE.deserialize(store.read(addr, null)); } /** * Add an entry for a commit record.. * * @param commitRecord * The commit record. * * @param commitRecordAddr * The address at which that commit record was written on the * store. * * @exception IllegalArgumentException * if <i>commitRecord</i> is <code>null</code>. * @exception IllegalArgumentException * if there is already a {@link ICommitRecord} registered * under for the {@link ICommitRecord#getTimestamp()}. * @exception IllegalArgumentException * if <i>addr</i> is invalid. */ public void add(long commitRecordAddr, ICommitRecord commitRecord) { if (commitRecord == null) throw new IllegalArgumentException(); if (commitRecordAddr == 0L) throw new IllegalArgumentException(); final long commitTime = commitRecord.getTimestamp(); final byte[] key = getKey(commitTime); if(super.contains(key)) { throw new IllegalArgumentException( "commit record exists: timestamp=" + commitTime); } // add an entry to the persistent index. super.insert(key,new Entry(commitTime,commitRecordAddr)); // // add to the transient cache. // cache.put(commitTime, commitRecord); } /** * An entry in the persistent index. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class Entry { /** * The commit time. */ public final long commitTime; /** * The {@link Addr address} of the {@link ICommitRecord} whose commit * timestamp is {@link #commitTime}. */ public final long addr; public Entry(long commitTime,long addr) { this.commitTime = commitTime; this.addr = addr; } } /** * The values are {@link Entry}s. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class ValueSerializer implements IValueSerializer { /** * */ private static final long serialVersionUID = 8085229450005958541L; public static transient final IValueSerializer INSTANCE = new ValueSerializer(); final public static transient int VERSION0 = 0x0; public ValueSerializer() { } public void putValues(DataOutputStream os, Object[] values, int n) throws IOException { os.writeInt(VERSION0); for (int i = 0; i < n; i++) { Entry entry = (Entry) values[i]; LongPacker.packLong(os, entry.commitTime); LongPacker.packLong(os, entry.addr); } } public void getValues(DataInputStream is, Object[] values, int n) throws IOException { final int version = is.readInt(); if (version != VERSION0) throw new RuntimeException("Unknown version: " + version); for (int i = 0; i < n; i++) { final long commitTime = Long.valueOf(LongPacker.unpackLong(is)); final long addr = Long.valueOf(LongPacker.unpackLong(is)); values[i] = new Entry(commitTime,addr); } } } } --- NEW FILE: ITimestampService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; /** * A service for unique timestamps. * * @todo define a low-latency implementation for use with a distributed database * commit protocol. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface ITimestampService { /** * Return the next unique timestamp. */ public long nextTimestamp(); } --- NEW FILE: CommitRecordSerializer.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ package com.bigdata.journal; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.CognitiveWeb.extser.LongPacker; import com.bigdata.io.ByteBufferInputStream; import com.bigdata.rawstore.Bytes; /** * A helper class for (de-)serializing the root addresses. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo we could use run length encoding to write only the used roots. */ public class CommitRecordSerializer { public static final int VERSION0 = 0x0; public static final transient CommitRecordSerializer INSTANCE = new CommitRecordSerializer(); public byte[] serialize(ICommitRecord commitRecord) { final long timestamp = commitRecord.getTimestamp(); final int n = commitRecord.getRootAddrCount(); try { ByteArrayOutputStream baos = new ByteArrayOutputStream( n * Bytes.SIZEOF_LONG); DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(VERSION0); dos.writeLong(timestamp); LongPacker.packLong(dos, n); for(int i=0; i<n; i++) { LongPacker.packLong(dos, commitRecord.getRootAddr(i)); } return baos.toByteArray(); } catch (IOException ex) { throw new RuntimeException(ex); } } public ICommitRecord deserialize(ByteBuffer buf) { try { ByteBufferInputStream bbis = new ByteBufferInputStream(buf); DataInputStream dis = new DataInputStream(bbis); final int version = dis.readInt(); if (version != VERSION0) throw new RuntimeException("Unknown version: " + version); final long timestamp = dis.readLong(); final int n = (int)LongPacker.unpackLong(dis); final long[] roots = new long[n]; for (int i = 0; i < n; i++) { roots[i] = LongPacker.unpackLong(dis); } return new CommitRecord(timestamp,roots); } catch (IOException ex) { throw new RuntimeException(ex); } } } --- NEW FILE: ICommitRecord.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import com.bigdata.rawstore.Addr; /** * An interface providing a read-only view of a commit record. A commit record * is written on each commit. The basic metadata in the commit record are the * root addresses from which various critical resources may be loaded, e.g., * data structures for mapping index names to their addresses on the * {@link Journal}, etc. The {@link Journal} maintains an * {@link Journal#getCommitRecord()} index over the commits records so that * {@link Tx transactions} can rapidly recover the commit record corresponding * to their historical, read-only ground state. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo consider modifying to allow named root addresses for a small #of root * names. E.g., an set of {name,addr} pairs that is either scanned as an * array list or lookup using a hash table. */ public interface ICommitRecord { /** * The #of root ids. Their indices are [0:N-1]. */ static public final int MAX_ROOT_ADDRS = 50; /** * The first root address that may be used for a user-defined object. User * defined root addresses begin at index 10. The first 10 root addresses are * reserved for use by the bigdata architecture. */ static public final int FIRST_USER_ROOT = 10; /** * The timestamp assigned to this commit record -or- <code>0L</code> iff * there is no {@link ICommitRecord} written on the {@link Journal}. */ public long getTimestamp(); /** * The #of allowed root addresses. */ public int getRootAddrCount(); /** * The last address stored in the specified root address in this * commit record. * * @param index * The index of the root {@link Addr address}. * * @return The {@link Addr address} stored at that index. * * @exception IndexOutOfBoundsException * if the index is negative or too large. */ public long getRootAddr(int index); } Index: Name2Addr.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Name2Addr.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** Name2Addr.java 9 Feb 2007 18:56:59 -0000 1.2 --- Name2Addr.java 17 Feb 2007 21:34:17 -0000 1.3 *************** *** 146,150 **** /* re-load btree from the store. */ ! btree = loadBTree(store,entry.name,entry.addr); // save name -> btree mapping in transient cache. --- 146,150 ---- /* re-load btree from the store. */ ! btree = BTreeMetadata.load(this.store, entry.addr); // save name -> btree mapping in transient cache. *************** *** 157,183 **** /** - * Re-load a named index from the store. - * <p> - * The default implementation uses the {@link BTree} constructor. In you - * need to return either a subclass of {@link BTree} or another - * implementation of {@link IIndex} then you MUST override this method to - * use the appropriate constructor. - * - * @param store - * The store. - * @param name - * The index name. - * @param addr - * The address of the metadata record. - * - * @return The named index as loaded from the specified address. - */ - protected IIndex loadBTree(IRawStore store, String name, long addr) { - - return new BTree(this.store, BTreeMetadata.read(this.store, addr)); - - } - - /** * Add an entry for the named index. * --- 157,160 ---- Index: IJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IJournal.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IJournal.java 10 Feb 2007 15:37:44 -0000 1.4 --- IJournal.java 17 Feb 2007 21:34:17 -0000 1.5 *************** *** 64,68 **** * @version $Id$ */ ! public interface IJournal extends IRawStore, IAtomicStore, IStore { /** --- 64,68 ---- * @version $Id$ */ ! public interface IJournal extends IRawStore, IAtomicStore, IIndexManager { /** *************** *** 89,139 **** /** - * Register a named index. Once registered the index will participate in - * atomic commits. - * <p> - * Note: A named index must be registered outside of any transaction before - * it may be used inside of a transaction. - * <p> - * Note: The return object MAY differ from the supplied {@link BTree}. For - * example, when using partitioned indices the {@link BTree} is encapsulated - * within an abstraction that knows how to managed index partitions. - * - * @param name - * The name that can be used to recover the index. - * - * @param btree - * The btree. - * - * @return The object that would be returned by {@link #getIndex(String)}. - * - * @todo The provided {@link BTree} must serve as a prototype so that it is - * possible to retain additional metadata. - */ - public IIndex registerIndex(String name, IIndex btree); - - /** - * Drops the named index (unisolated). The index is removed as a - * {@link ICommitter} and all resources dedicated to that index are - * reclaimed, including secondary index segment files, the metadata index, - * etc. - * - * @param name - * The name of the index to be dropped. - * - * @exception IllegalArgumentException - * if <i>name</i> does not identify a registered index. - * - * @todo add a rename index method, but note that names in the file system - * would not change. - * - * @todo declare a method that returns or visits the names of the registered - * indices. - * - * @todo consider adding a delete method that releases all resources - * (including indices) and then closes the journal. - */ - public void dropIndex(String name); - - /** * Return the named index which MAY may be invalidated by a * {@link IAtomicStore#commit()}. --- 89,92 ---- Index: IAtomicStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IAtomicStore.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** IAtomicStore.java 15 Feb 2007 01:34:22 -0000 1.3 --- IAtomicStore.java 17 Feb 2007 21:34:17 -0000 1.4 *************** *** 68,71 **** --- 68,74 ---- * Request an atomic commit. * + * @return The timestamp assigned to the {@link ICommitRecord} -or- 0L if + * there were no data to commit. + * * @exception IllegalStateException * if the store is not open. *************** *** 73,77 **** * if the store is not writable. */ ! public void commit(); /** --- 76,80 ---- * if the store is not writable. */ ! public long commit(); /** *************** *** 107,116 **** * committed state of the store. * ! * @param rootSlot ! * The root slot identifier. * ! * @return The {@link Addr address} written in that slot. */ ! public long getAddr(int rootSlot); } --- 110,122 ---- * committed state of the store. * ! * @param index ! * The index of the root {@link Addr address}. * ! * @return The {@link Addr address} stored at that index. ! * ! * @exception IndexOutOfBoundsException ! * if the index is negative or too large. */ ! public long getRootAddr(int index); } Index: FileMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/FileMetadata.java,v retrieving revision 1.11 retrieving revision 1.12 diff -C2 -d -r1.11 -r1.12 *** FileMetadata.java 15 Feb 2007 20:59:21 -0000 1.11 --- FileMetadata.java 17 Feb 2007 21:34:18 -0000 1.12 *************** *** 1,2 **** --- 1,45 ---- + /** + + The Notice below must appear in each file of the Source Code of any + copy you distribute of the Licensed Product. Contributors to any + Modifications may add their own copyright notices to identify their + own contributions. + + License: + + The contents of this file are subject to the CognitiveWeb Open Source + License Version 1.1 (the License). You may not copy or use this file, + in either source code or executable form, except in compliance with + the License. You may obtain a copy of the License from + + http://www.CognitiveWeb.org/legal/license/ + + Software distributed under the License is distributed on an AS IS + basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + the License for the specific language governing rights and limitations + under the License. + + Copyrights: + + Portions created by or assigned to CognitiveWeb are Copyright + (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact + information for CognitiveWeb is available at + + http://www.CognitiveWeb.org + + Portions Copyright (c) 2002-2003 Bryan Thompson. + + Acknowledgements: + + Special thanks to the developers of the Jabber Open Source License 1.0 + (JOSL), from which this License was derived. This License contains + terms that differ from JOSL. + + Special thanks to the CognitiveWeb Open Source Contributors for their + suggestions and support of the Cognitive Web. + + Modifications: + + */ package com.bigdata.journal; *************** *** 462,472 **** nextOffset = 0; final long commitCounter = 0L; final long firstTxId = 0L; final long lastTxId = 0L; ! final long[] rootIds = new long[ RootBlockView.MAX_ROOT_ADDRS ]; IRootBlockView rootBlock0 = new RootBlockView(true, segmentId, ! nextOffset, firstTxId, lastTxId, commitCounter, rootIds); IRootBlockView rootBlock1 = new RootBlockView(false, segmentId, ! nextOffset, firstTxId, lastTxId, commitCounter, rootIds); FileChannel channel = raf.getChannel(); channel.write(rootBlock0.asReadOnlyBuffer(), OFFSET_ROOT_BLOCK0); --- 505,519 ---- nextOffset = 0; final long commitCounter = 0L; + final long commitTimestamp = 0L; final long firstTxId = 0L; final long lastTxId = 0L; ! final long commitRecordAddr = 0L; ! final long commitRecordIndexAddr = 0L; IRootBlockView rootBlock0 = new RootBlockView(true, segmentId, ! nextOffset, firstTxId, lastTxId, commitTimestamp, ! commitCounter, commitRecordAddr, commitRecordIndexAddr); IRootBlockView rootBlock1 = new RootBlockView(false, segmentId, ! nextOffset, firstTxId, lastTxId, commitTimestamp, ! commitCounter, commitRecordAddr, commitRecordIndexAddr); FileChannel channel = raf.getChannel(); channel.write(rootBlock0.asReadOnlyBuffer(), OFFSET_ROOT_BLOCK0); Index: TransactionServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TransactionServer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TransactionServer.java 8 Nov 2006 15:18:13 -0000 1.1 --- TransactionServer.java 17 Feb 2007 21:34:17 -0000 1.2 *************** *** 89,93 **** * target. Achieving higher concurrency will require more localized * mechanisms, e.g., single row atomic updates. There is also a use case ! * for read uncommitted transactions in order to permit earlier GC with * very very long running transactions, which would otherwise defer GC * until their completion. --- 89,93 ---- * target. Achieving higher concurrency will require more localized * mechanisms, e.g., single row atomic updates. There is also a use case ! * for read committed transactions in order to permit earlier GC with * very very long running transactions, which would otherwise defer GC * until their completion. *************** *** 514,515 **** --- 514,702 ---- } + + //* + //* FIXME Detect transaction identifiers that go backwards? For example tx0 + //* starts on one segment A while tx1 starts on segment B. Tx0 later starts + //* on segment B. From the perspective of segment B, tx0 begins after tx1. + //* This does not look like a problem unless there is an intevening commit, + //* at which point tx0 and tx1 will have starting contexts that differ by the + //* write set of the commit.<br> + //* What exactly is the impact when transactions start out of sequence? Do we + //* need to negotiated a distributed start time among all segments on which + //* the transaction starts? That would be a potential source of latency and + //* other kinds of pain. Look at how this is handled in the literature. One + //* way to handle it is essentially to declare the intention of the + //* transaction and pre-notify segments that will be written. This requires + //* some means of figuring out that intention and is probably relevant (and + //* solvable) only for very large row or key scans. + //* + //* @todo What exactly is the impact when transactions end out of sequence? I + //* presume that this is absolutely Ok. + + + // /** + // * <p> + // * Deallocate slots for versions having a transaction timestamp less than + // or + // * equal to <i>timestamp</i> that have since been overwritten (or deleted) + // * by a committed transaction having a timestamp greater than + // <i>timestamp</i>. + // * </p> + // * <p> + // * The criteria for deallocating historical versions is that (a) there is + // a + // * more recent version; and (b) there is no ACTIVE (vs PENDING or + // COMPLETED) + // * transaction which could read from that historical version. The journal + // * does NOT locally have enough information to decide when it can swept + // * historical versions written by a given transaction. This notice MUST + // come + // * from a transaction service which has global knowledge of which + // * transactions have PREPARED or ABORTED and can generate notices when all + // * transactions before a given timestamp have been PREPARED or ABORTED. + // For + // * example, a long running transaction can cause notice to be delayed for + // * many short lived transactions that have since completed. Once the long + // * running transaction completes, the transaction server can compute the + // * largest timestamp value below which there are no active transactions + // and + // * generate a single notice with that timestamp. + // * </p> + // * + // * @param timestamp + // * The timestamp. + // * + // * @todo This operation MUST be extremely efficient. + // * + // * @todo This method is exposed suposing a transaction service that will + // * deliver notice when the operation should be conducted based on + // * total knowledge of the state of all transactions running against + // * the distributed database. As such, it may have to scan the journal + // * to locate the commit record for transactions satisifying the + // * timestamp criteria. + // */ + // void gcTx( long timestamp ) { + + // // * <p> + // // * Note: Migration to the read-optimized database is NOT a + // pre-condition for + // // * deallocation of historical versions - rather it enables us to remove + // the + // // * <em>current</em> committed version from the journal. + // // * </p> + + // /* + // * FIXME Implement garbage collection of overwritten and unreachable + // * versions. Without migration to a read-optimized database, GC by + // * itself is NOT sufficient to allow us to deallocate versions that have + // * NOT been overwritten and hence is NOT sufficient to allow us to + // * discard historical transactions in their entirety. + // * + // * Given a transaction Tn that overwrites one or more pre-existing + // * versions, the criteria for deallocation of the overwritten versions + // * are: + // * + // * (A) Tn commits, hence its intention has been made persistent; and + // * + // * (B) There are no active transactions remaining that started from a + // * committed state before the commit state resulting from Tn, hence the + // * versions overwritten by Tn are not visible to any active transaction. + // * Any new transaction will read through the committed state produced by + // * Tn and will perceive the new versions rather than the overwritten + // * versions. + // * + // * Therefore, once Tn commits (assuming it has overwritten at least one + // * pre-existing version), we can add each concurrent transaction Ti that + // * is still active when Tn commits to a set of transactions that must + // * either validate or abort before we may GC(Tn). Since Tn has committed + // * it is not possible for new transactions to be created that would have + // * to be included in this set since any new transaction would start from + // * the committed state of Tn or its successors in the serialization + // * order. As transactions validate or abort they are removed from + // * GC(Tn). When this set is empty, we garbage collect the pre-existing + // * versions that were overwritten by Tn. + // * + // * The sets GC(T) must be restart safe. Changes to the set can only + // * occur when a transaction commits or aborts. However, even the abort + // * of a transaction MUST be noticable on restart. + // * + // * A summary may be used that is the highest transaction timestamp for + // * which Tn must wait before running GC(Tn). That can be written once + // * + // * + // * Note that multiple transactions may have committed, so we may find + // * that Tn has successors in the commit/serialization order that also + // * meet the above criteria. All such committed transactions may be + // * processed at once, but they MUST be processed in their commit order. + // * + // * Once those pre-conditions have been met the following algorithm is + // * applied to GC the historical versions that were overwritten by Tn: + // * + // * 1. For each write by Ti where n < i <= m that overwrote a historical + // * version, deallocate the slots for that historical version. This is + // * valid since there are no active transactions that can read from that + // * historical state. The processing order for Ti probably does not + // * matter, but in practice there may be a reason to choose the + // * serialization or reverse serialization order + // * + // * ---- this is getting closed but is not yet correct ---- + // * + // * All versions written by a given transaction have the timestamp of + // * that transaction. + // * + // * The committed transactions are linked by their commit records into a + // * reverse serialization sequence. + // * + // * Each committed transaction has an object index that is accessible + // * from its commit record. The entries in this index are versions that + // * were written (or deleted) by that transaction. This index reads + // * through into the object index for the committed state of the journal + // * from which the transaction was minted. + // * + // * We could maintain in the entry information about the historical + // * version that was overwritten. For example, its firstSlot or a run + // * length encoding of the slots allocated to the historical version. + // * + // * We could maintain an index for all overwritten versions from + // * [timestamp + dataId] to [slotId] (or a run-length encoding of the + // * slots on which the version was written). Given a timestamp, we would + // * then do a key scan from the start of the index for all entries whose + // * timestamp was less than or equal to the given timestamp. For each + // * such entry, we would deallocate the version and delete the entry from + // * the index. + // * + // * tx0 : begin tx0 : write id0 (v0) tx0 : commit journal : deallocate <= + // * tx0 (NOP since no overwritten versions) + // * + // * tx1 : begin tx2 : begin tx1 : write id0 (v1) tx1 : commit journal : + // * deallocate <= tx1 (MUST NOT BE GENERATED since dependencies exist : + // * tx1 and tx0 both depend on the committed state of tx0 -- sounds like + // * lock style dependencies for deallocation !) tx2 : commit journal : + // * deallocate <= tx2 + // * + // * index:: [ tx0 : id0 ] : v0 [ tx1 : id1 ] : v1 + // * + // * keyscan <= tx2 + // */ + + // } + + // /** + // * The transaction identifier of the last transaction begun on this + // journal. + // * In order to avoid extra IO this value survives restart IFF there is an + // * intervening commit by any active transaction. This value is used to + // * reject transactions whose identifier arrives out of sequence at the + // * journal. + // * + // * @return The transaction identifier or <code>-1</code> if no + // * transactions have begun on the journal (or if no transactions + // * have ever committed and no transaction has begun since restart). + // */ + // public long getLastBegunTx() { + // + // return lastBegunTx; + // + // } + // private long lastBegunTx = -1; + --- NEW FILE: LocalTimestampService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import com.bigdata.util.TimestampFactory; /** * A purely local implementation of an {@link ITimestampService} using a * {@link TimestampFactory} to assign distinct timestamps. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class LocalTimestampService implements ITimestampService { public static final transient ITimestampService INSTANCE = new LocalTimestampService(); public long nextTimestamp() { return TimestampFactory.nextNanoTime(); } } Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.28 retrieving revision 1.29 diff -C2 -d -r1.28 -r1.29 *** Tx.java 15 Feb 2007 22:01:18 -0000 1.28 --- Tx.java 17 Feb 2007 21:34:17 -0000 1.29 *************** *** 54,60 **** import com.bigdata.isolation.IsolatedBTree; import com.bigdata.isolation.UnisolatedBTree; - import com.bigdata.objndx.BTree; import com.bigdata.objndx.IIndex; import com.bigdata.objndx.IndexSegment; import com.bigdata.rawstore.IRawStore; import com.bigdata.scaleup.MetadataIndex; --- 54,60 ---- import com.bigdata.isolation.IsolatedBTree; import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.objndx.IIndex; import com.bigdata.objndx.IndexSegment; + import com.bigdata.objndx.ReadOnlyIndex; import com.bigdata.rawstore.IRawStore; import com.bigdata.scaleup.MetadataIndex; *************** *** 66,73 **** * named indices, and perform operations on those indices, and the operations * will be isolated according to the isolation level of the transaction. When ! * using an isolated transaction, writes are accumulated in an * {@link IsolatedBTree}. The write set is validated when the transaction * {@link #prepare()}s and finally merged down onto the global state when the ! * transaction commits. * </p> * --- 66,74 ---- * named indices, and perform operations on those indices, and the operations * will be isolated according to the isolation level of the transaction. When ! * using a writable isolated transaction, writes are accumulated in an * {@link IsolatedBTree}. The write set is validated when the transaction * {@link #prepare()}s and finally merged down onto the global state when the ! * transaction commits. When the transaction is read-only, writes will be ! * rejected and {@link #prepare()} and {@link #commit()} are NOPs. * </p> * *************** *** 75,83 **** * @version $Id$ * ! * @todo Transactions may be requested that are read-only for some historical ! * timestamp, that are read-committed (data committed by _other_ * transactions during the transaction will be visible within that ! * transaction), or that are fully isolated (changes made in other ! * transactions are not visible within the transaction). * * @todo The write set of a transaction is currently written onto an independent --- 76,95 ---- * @version $Id$ * ! * @todo Support read-committed transactions (data committed by _other_ * transactions during the transaction will be visible within that ! * transaction). Read-committed transactions do NOT permit writes (they ! * are read-only). Prepare and commit are NOPs. This might be a distinct ! * implementation sharing a common base class for handling the run state ! * stuff, or just a distinct implementation altogether. The read-committed ! * transaction is implemented by just reading against the named indices on ! * the journal. However, since commits or overflows of the journal might ! * invalidate the index objects we may have to setup a delegation ! * mechanism that resolves the named index either on each operation or ! * whenever the transaction receives notice that the index must be ! * discarded. In order to NOT see in-progress writes, the read-committed ! * transaction actually needs to dynamically resolve the most recent ! * {@link ICommitRecord} and then use the named indices resolved from ! * that. This suggests an event notification mechanism for commits so that ! * we can get the commit record more cheaply. * * @todo The write set of a transaction is currently written onto an independent *************** *** 105,109 **** * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by timestamp into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). --- 117,121 ---- * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by startTimestamp into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). *************** *** 138,144 **** /** ! * The timestamp assigned to this transaction. */ ! final protected long timestamp; /** --- 150,171 ---- /** ! * The start startTimestamp assigned to this transaction. ! * <p> ! * Note: Transaction {@link #startTimestamp} and {@link #commitTimestamp}s ! * are assigned by a global time service. The time service must provide ! * unique times for transaction start and commit timestamps and for commit ! * times for unisolated {@link Journal#commit()}s. */ ! final protected long startTimestamp; ! ! /** ! * The commit startTimestamp assigned to this transaction. ! */ ! private long commitTimestamp; ! ! /** ! * True iff the transaction is read only and will reject writes. ! */ ! final protected boolean readOnly; /** *************** *** 148,151 **** --- 175,185 ---- final protected long commitCounter; + /** + * The historical {@link ICommitRecord} choosen as the ground state for this + * transaction. All indices isolated by this transaction are isolated as of + * the discoverable root address based on this commit record. + */ + final private ICommitRecord commitRecord; + private RunState runState; *************** *** 165,232 **** /** ! * BTrees isolated by this transactions. */ ! private Map<String, IsolatedBTree> btrees = new HashMap<String, IsolatedBTree>(); /** ! * Return a named index. The index will be isolated at the same level as ! * this transaction. Changes on the index will be made restart-safe iff the ! * transaction successfully commits. ! * ! * @param name ! * The name of the index. ! * ! * @return The named index or <code>null</code> if no index is registered ! * under that name. */ ! public IIndex getIndex(String name) { ! ! if(name==null) throw new IllegalArgumentException(); ! ! /* ! * store the btrees in hash map so that we can recover the same instance ! * on each call within the same transaction. ! */ ! BTree btree = btrees.get(name); ! ! if(btree == null) { ! ! /* ! * see if the index was registered. ! * ! * FIXME this gets the last committed unisolated index. We need to ! * add a timestamp parameter and look up the appropriate metadata ! * record based on both the name and the timestamp (first metadata ! * record for the named index having a timestamp LT the transaction ! * timestamp). since this is always a request for historical and ! * read-only data, this method should not register a committer and ! * the returned btree should not participate in the commit protocol. ! */ ! UnisolatedBTree src = (UnisolatedBTree)journal.getIndex(name); ! ! // the named index was never registered. ! if(name==null) return null; ! ! /* ! * Isolate the named btree. ! */ ! return new IsolatedBTree(tmpStore,src); ! ! ! } ! ! return btree; ! } ! ! /** ! * This method must be invoked any time a transaction completes (aborts or ! * commits) in order to release the hard references to any named btrees ! * isolated within this transaction so that the JVM may reclaim the space ! * allocated to them on the heap. ! */ ! private void releaseBTrees() { ! ! btrees.clear(); } --- 199,212 ---- /** ! * Indices isolated by this transactions. */ ! private Map<String, IIndex> btrees = new HashMap<String, IIndex>(); /** ! * Create a fully isolated read-write transaction. */ ! public Tx(Journal journal,long timestamp) { ! this(journal,timestamp,false); } *************** *** 234,249 **** /** * Create a transaction starting the last committed state of the journal as ! * of the specified timestamp. * * @param journal * The journal. * ! * @param timestamp ! * The timestamp. * * @exception IllegalStateException * if the transaction state has been garbage collected. */ ! public Tx(Journal journal, long timestamp ) { if (journal == null) --- 214,238 ---- /** * Create a transaction starting the last committed state of the journal as ! * of the specified startTimestamp. * * @param journal * The journal. * ! * @param startTimestamp ! * The startTimestamp, which MUST be assigned consistently based on a ! * {@link ITimestampService}. Note that a transaction does not ! * start up on all {@link Journal}s at the same time. Instead, ! * the transaction start startTimestamp is assigned by a centralized ! * time service and then provided each time a transaction object ! * must be created for isolated on some {@link Journal}. ! * ! * @param readOnly ! * When true the transaction will reject writes and ! * {@link #prepare()} and {@link #commit()} will be NOPs. * * @exception IllegalStateException * if the transaction state has been garbage collected. */ ! public Tx(Journal journal, long timestamp, boolean readOnly) { if (journal == null) *************** *** 252,257 **** this.journal = journal; ! this.timestamp = timestamp; journal.activateTx(this); --- 241,248 ---- this.journal = journal; ! this.startTimestamp = timestamp; + this.readOnly = readOnly; + journal.activateTx(this); *************** *** 262,265 **** --- 253,263 ---- */ this.commitCounter = journal.getRootBlockView().getCommitCounter(); + + /* + * The commit record serving as the ground state for the indices + * isolated by this transaction (MAY be null, in which case the + * transaction will be unable to isolate any indices). + */ + this.commitRecord = journal.getCommitRecord(timestamp); this.runState = RunState.ACTIVE; *************** *** 268,278 **** /** ! * The transaction identifier (aka timestamp). * ! * @return The transaction identifier (aka timestamp). */ ! final public long getId() { ! return timestamp; } --- 266,311 ---- /** ! * The transaction identifier. * ! * @return The transaction identifier. ! * ! * @todo verify that this has the semantics of the transaction start time ! * and that the startTimestamp is (must be) assigned by the same service ! * that assigns the {@link #getCommitTimestamp()}. */ ! final public long getStartTimestamp() { ! return startTimestamp; ! ! } ! ! /** ! * Return the commit startTimestamp assigned to this transaction. ! * ! * @return The commit startTimestamp assigned to this transaction. ! * ! * @exception IllegalStateException ! * unless the transaction writable and {@link #isPrepared()} ! * or {@link #isCommitted()}. ! */ ! final public long getCommitTimestamp() { ! ! if(readOnly) { ! ! throw new IllegalStateException(); ! ! } ! ! switch(runState) { ! case ACTIVE: ! case ABORTED: ! throw new IllegalStateException(); ! case PREPARED: ! case COMMITTED: ! if(commitTimestamp == 0L) throw new AssertionError(); ! return commitTimestamp; ! } ! ! throw new AssertionError(); } *************** *** 280,284 **** public String toString() { ! return ""+timestamp; } --- 313,329 ---- public String toString() { ! return ""+startTimestamp; ! ! } ! ! /** ! * This method must be invoked any time a transaction completes (aborts or ! * commits) in order to release the hard references to any named btrees ! * isolated within this transaction so that the JVM may reclaim the space ! * allocated to them on the heap. ! */ ! private void releaseBTrees() { ! ! btrees.clear(); } *************** *** 300,328 **** } ! throw new IllegalStateException(NOT_ACTIVE); ! } ! try { /* ! * Validate against the current state of the journal's object index. */ - if( ! validate() ) { - abort(); ! ! throw new RuntimeException("Validation failed: write-write conflict"); ! } ! ! } catch( Throwable t ) { ! ! abort(); ! ! throw new RuntimeException("Unexpected error: "+t, t); ! } --- 345,392 ---- } ! throw new IllegalStateException(NOT_ACTIVE); ! } ! if (!readOnly) { /* ! * The commit startTimestamp is assigned when we prepare the transaction ! * since the the commit protocol does not permit unisolated writes ! * once a transaction begins to prepar until the transaction has ! * either committed or aborted (if such writes were allowed then we ! * would have to re-validate the transaction in order to enforce ! * serializability). ! * ! * @todo resolve this against a service in a manner that will ! * support a distributed database commit protocol. */ + commitTimestamp = journal.timestampFactory.nextTimestamp(); + + try { + + /* + * Validate against the current state of the journal's object + * index. + */ + + if (!validate()) { + + abort(); + + throw new RuntimeException( + "Validation failed: write-write conflict"); + + } + + } catch (Throwable t) { abort(); ! ! throw new RuntimeException("Unexpected error: " + t, t); ! } ! } *************** *** 349,353 **** * already complete, then it is aborted. */ ! public void commit() { if( ! isPrepared() ) { --- 413,417 ---- * already complete, then it is aborted. */ ! public long commit() { if( ! isPrepared() ) { *************** *** 363,387 **** } - /* - * Merge each isolated index into the global scope. This also marks the - * slots used by the versions written by the transaction as 'committed'. - * This operation MUST succeed (at a logical level) since we have - * already validated (neither read-write nor write-write conflicts - * exist). - * - * @todo Non-transactional operations on the global scope should be - * either disallowed entirely or locked out during the prepare-commit - * protocol when using transactions since they (a) could invalidate the - * pre-condition for the merge; and (b) uncommitted changes would be - * discarded if the merge operation fails. One solution is to use - * batch operations or group commit mechanism to dynamically create - * transactions from unisolated operations. - */ try { ! mergeOntoGlobalState(); ! // Atomic commit. ! journal.commit(this); runState = RunState.COMMITTED; --- 427,457 ---- } try { ! if(!readOnly) { ! ! /* ! * Merge each isolated index into the global scope. This also ! * marks the slots used by the versions written by the ! * transaction as 'committed'. This operation MUST succeed (at a ! * logical level) since we have already validated (neither ! * read-write nor write-write conflicts exist). ! * ! * @todo Non-transactional operations on the global scope should ! * be either disallowed entirely or locked out during the ! * prepare-commit protocol when using transactions since they ! * (a) could invalidate the pre-condition for the merge; and (b) ! * uncommitted changes would be discarded if the merge operation ! * fails. One solution is to use batch operations or group ! * commit mechanism to dynamically create transactions from ! * unisolated operations. ! */ ! ! mergeOntoGlobalState(); ! // Atomic commit. ! journal.commit(this); ! ! } runState = RunState.COMMITTED; *************** *** 421,424 **** --- 491,496 ---- } + + return getCommitTimestamp(); } *************** *** 449,452 **** --- 521,530 ---- } + final public boolean isReadOnly() { + + return ... [truncated message content] |