You can subscribe to this list here.
2006 |
Jan
|
Feb
|
Mar
(414) |
Apr
(123) |
May
(448) |
Jun
(180) |
Jul
(17) |
Aug
(49) |
Sep
(3) |
Oct
(92) |
Nov
(101) |
Dec
(64) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2007 |
Jan
(132) |
Feb
(230) |
Mar
(146) |
Apr
(146) |
May
|
Jun
|
Jul
(34) |
Aug
(4) |
Sep
(3) |
Oct
(10) |
Nov
(12) |
Dec
(24) |
2008 |
Jan
(6) |
Feb
|
Mar
|
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(11) |
Nov
(4) |
Dec
|
2009 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5461/src/java/com/bigdata/journal Modified Files: TransientBufferStrategy.java AbstractBufferStrategy.java TemporaryStore.java Name2Addr.java IJournal.java IBufferStrategy.java CommitRecordIndex.java ICommitRecord.java DiskOnlyStrategy.java ForceEnum.java CommitRecord.java Tx.java ITx.java BasicBufferStrategy.java MappedBufferStrategy.java BlockWriteCache.java Options.java DirectBufferStrategy.java FileMetadata.java Journal.java CommitRecordSerializer.java Added Files: IMROW.java TemporaryRawStore.java Removed Files: DataDeletedException.java PageCommitList.java Log Message: Further work supporting transactional isolation. Index: ForceEnum.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ForceEnum.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ForceEnum.java 31 Oct 2006 17:11:58 -0000 1.1 --- ForceEnum.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 65,69 **** * The file is not forced to stable storage. */ ! No("no"), /** --- 65,69 ---- * The file is not forced to stable storage. */ ! No("No"), /** *************** *** 72,76 **** * @see FileChannel#force(boolean) */ ! Force("force"), /** --- 72,76 ---- * @see FileChannel#force(boolean) */ ! Force("Force"), /** *************** *** 79,83 **** * @see FileChannel#force(boolean) */ ! ForceMetadata("metadata"); private String name; --- 79,83 ---- * @see FileChannel#force(boolean) */ ! ForceMetadata("ForceMetadata"); private String name; *************** *** 94,98 **** /** ! * Parse a string whose contents must be "no", "force", "forceMetadata". * * @param s --- 94,98 ---- /** ! * Parse a string whose contents must be "No", "Force", "ForceMetadata". * * @param s Index: Name2Addr.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Name2Addr.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Name2Addr.java 17 Feb 2007 21:34:17 -0000 1.3 --- Name2Addr.java 21 Feb 2007 20:17:21 -0000 1.4 *************** *** 43,46 **** --- 43,49 ---- * the use of the {@link #keyBuilder} and therefore minimizes the relatively * expensive operation of encoding unicode names to byte[] keys. + * + * @todo use a weak value cache so that unused indices may be swept by the + * GC. */ private Map<String,IIndex> name2BTree = new HashMap<String,IIndex>(); *************** *** 144,149 **** } ! /* re-load btree from the store. ! */ btree = BTreeMetadata.load(this.store, entry.addr); --- 147,151 ---- } ! // re-load btree from the store. btree = BTreeMetadata.load(this.store, entry.addr); Index: MappedBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/MappedBufferStrategy.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** MappedBufferStrategy.java 8 Feb 2007 21:32:10 -0000 1.8 --- MappedBufferStrategy.java 21 Feb 2007 20:17:21 -0000 1.9 *************** *** 5,8 **** --- 5,9 ---- import com.bigdata.rawstore.Addr; + import com.bigdata.scaleup.PartitionedJournal; /** *************** *** 26,29 **** --- 27,38 ---- * @see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 * @see BufferMode#Mapped + * + * @todo since the mapped file can not be extended or truncated, use of mapped + * files pretty much suggests that we pre-extend to the maximum allowable + * extent. it something can not be committed due to overflow, then a tx + * could be re-committed after a rollover of a {@link PartitionedJournal}. + * Note that the use of mapped files might not prove worth the candle due + * to the difficulties with resource deallocation for this strategy and + * the good performance of some alternative strategies. */ public class MappedBufferStrategy extends DiskBackedBufferStrategy { *************** *** 39,42 **** --- 48,57 ---- */ final MappedByteBuffer mappedBuffer; + + public boolean isFullyBuffered() { + + return false; + + } MappedBufferStrategy(long maximumExtent, FileMetadata fileMetadata) { --- NEW FILE: IMROW.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 20, 2007 */ package com.bigdata.journal; import com.bigdata.rawstore.IRawStore; /** * A marker interface for a store that supports Multiple Readers, One Writer. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IMROW extends IRawStore { } --- DataDeletedException.java DELETED --- Index: CommitRecordSerializer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/CommitRecordSerializer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** CommitRecordSerializer.java 17 Feb 2007 21:34:18 -0000 1.1 --- CommitRecordSerializer.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 73,76 **** --- 73,78 ---- final long timestamp = commitRecord.getTimestamp(); + final long commitCounter = commitRecord.getCommitCounter(); + final int n = commitRecord.getRootAddrCount(); *************** *** 85,88 **** --- 87,92 ---- dos.writeLong(timestamp); + + dos.writeLong(commitCounter); LongPacker.packLong(dos, n); *************** *** 119,122 **** --- 123,128 ---- final long timestamp = dis.readLong(); + final long commitCounter = dis.readLong(); + final int n = (int)LongPacker.unpackLong(dis); *************** *** 129,133 **** } ! return new CommitRecord(timestamp,roots); } catch (IOException ex) { --- 135,139 ---- } ! return new CommitRecord(timestamp,commitCounter,roots); } catch (IOException ex) { Index: ICommitRecord.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ICommitRecord.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ICommitRecord.java 17 Feb 2007 21:34:17 -0000 1.1 --- ICommitRecord.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 86,90 **** */ public long getTimestamp(); ! /** * The #of allowed root addresses. --- 86,97 ---- */ public long getTimestamp(); ! ! /** ! * The commit counter associated with the commit record. This is used by ! * transactions in order to determine whether or not intervening commits ! * have occurred since the transaction start time. ! */ ! public long getCommitCounter(); ! /** * The #of allowed root addresses. Index: ITx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ITx.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** ITx.java 19 Feb 2007 01:05:39 -0000 1.3 --- ITx.java 21 Feb 2007 20:17:21 -0000 1.4 *************** *** 67,71 **** /** ! * Prepare the transaction for a {@link #commit()}. * * @exception IllegalStateException --- 67,71 ---- /** ! * Validate the write set for the transaction. * * @exception IllegalStateException Index: AbstractBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** AbstractBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.10 --- AbstractBufferStrategy.java 21 Feb 2007 20:17:21 -0000 1.11 *************** *** 5,9 **** import java.nio.ByteBuffer; import java.nio.channels.FileChannel; - import java.text.Format; import java.text.NumberFormat; --- 5,8 ---- Index: Journal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Journal.java,v retrieving revision 1.54 retrieving revision 1.55 diff -C2 -d -r1.54 -r1.55 *** Journal.java 20 Feb 2007 00:27:03 -0000 1.54 --- Journal.java 21 Feb 2007 20:17:21 -0000 1.55 *************** *** 50,57 **** import java.io.File; import java.nio.ByteBuffer; - import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; --- 50,57 ---- import java.io.File; import java.nio.ByteBuffer; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; *************** *** 66,69 **** --- 66,70 ---- import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; + import com.bigdata.util.concurrent.DaemonThreadFactory; /** *************** *** 135,139 **** * writes and deletes on the journal). (However, note transaction processing MAY * be concurrent since the write set of a transaction is written on a ! * {@link TemporaryStore}.) * </p> * --- 136,140 ---- * writes and deletes on the journal). (However, note transaction processing MAY * be concurrent since the write set of a transaction is written on a ! * {@link TemporaryRawStore}.) * </p> * *************** *** 195,199 **** * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal, ITransactionManager { /** --- 196,200 ---- * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal { /** *************** *** 752,755 **** --- 753,762 ---- } + public boolean isFullyBuffered() { + + return _bufferStrategy.isFullyBuffered(); + + } + /** * Return a read-only view of the current root block. *************** *** 840,844 **** public void abort() { ! // clear the root addresses - the will be reloaded. _commitRecord = null; --- 847,851 ---- public void abort() { ! // clear the root addresses - they will be reloaded. _commitRecord = null; *************** *** 932,937 **** */ final ICommitRecord commitRecord = new CommitRecord(commitTimestamp, ! rootAddrs); final long commitRecordAddr = write(ByteBuffer --- 939,948 ---- */ + final IRootBlockView old = _rootBlock; + + final long newCommitCounter = old.getCommitCounter() + 1; + final ICommitRecord commitRecord = new CommitRecord(commitTimestamp, ! newCommitCounter, rootAddrs); final long commitRecordAddr = write(ByteBuffer *************** *** 978,983 **** { - final IRootBlockView old = _rootBlock; - /* * Update the firstTxId the first time a transaction commits and the --- 989,992 ---- *************** *** 1011,1015 **** !old.isRootBlock0(), old.getSegmentId(), _bufferStrategy .getNextOffset(), firstTxId, lastTxId, ! commitTimestamp, old.getCommitCounter() + 1, commitRecordAddr, commitRecordIndexAddr); --- 1020,1024 ---- !old.isRootBlock0(), old.getSegmentId(), _bufferStrategy .getNextOffset(), firstTxId, lastTxId, ! commitTimestamp, newCommitCounter, commitRecordAddr, commitRecordIndexAddr); *************** *** 1068,1076 **** } ! public ByteBuffer read(long addr, ByteBuffer dst) { assertOpen(); ! return _bufferStrategy.read(addr, dst); } --- 1077,1085 ---- } ! public ByteBuffer read(long addr) { assertOpen(); ! return _bufferStrategy.read(addr); } *************** *** 1105,1115 **** if (commitRecordAddr == 0L) { ! _commitRecord = new CommitRecord(0L); } else { _commitRecord = CommitRecordSerializer.INSTANCE ! .deserialize(_bufferStrategy.read(commitRecordAddr, ! null)); } --- 1114,1123 ---- if (commitRecordAddr == 0L) { ! _commitRecord = new CommitRecord(); } else { _commitRecord = CommitRecordSerializer.INSTANCE ! .deserialize(_bufferStrategy.read(commitRecordAddr)); } *************** *** 1287,1295 **** * that satisify the probe. * ! * @todo this implies that timestamps for all commits are generated by a ! * global timestamp service. */ public ICommitRecord getCommitRecord(long timestamp) { ! return _commitRecordIndex.find(timestamp); --- 1295,1306 ---- * that satisify the probe. * ! * @todo the {@link CommitRecordIndex} is a possible source of thread ! * contention since transactions need to use this code path in order ! * to locate named indices but the {@link #commitService} can also ! * write on this index. I have tried some different approaches to ! * handling this. */ public ICommitRecord getCommitRecord(long timestamp) { ! return _commitRecordIndex.find(timestamp); *************** *** 1331,1344 **** * prepared or aborted. */ ! final Map<Long, ITx> activeTx = new HashMap<Long, ITx>(); /** * A hash map containing all transactions that have prepared but not yet ! * either committed or aborted. ! * ! * @todo this is probably useless. A transaction will be in this map only ! * while it is actively committing. */ ! final Map<Long, ITx> preparedTx = new HashMap<Long, ITx>(); /** --- 1342,1354 ---- * prepared or aborted. */ ! final Map<Long, ITx> activeTx = new ConcurrentHashMap<Long, ITx>(); /** * A hash map containing all transactions that have prepared but not yet ! * either committed or aborted. A transaction will be in this map only while ! * it is actively committing, so this is always a "map" of one and could be ! * replaced by a scalar reference. */ ! final Map<Long, ITx> preparedTx = new ConcurrentHashMap<Long, ITx>(); /** *************** *** 1349,1354 **** */ final ExecutorService commitService = Executors ! .newSingleThreadExecutor(Executors.defaultThreadFactory()); ! public long newTx() { --- 1359,1365 ---- */ final ExecutorService commitService = Executors ! .newSingleThreadExecutor(DaemonThreadFactory ! .defaultThreadFactory()); ! public long newTx() { *************** *** 1359,1362 **** --- 1370,1376 ---- public long newTx(boolean readOnly) { + // System.err.println("#active=" + activeTx.size() + ", #committed=" + // + _rootBlock.getCommitCounter()); + return new Tx(this, timestampFactory.nextTimestamp(), readOnly) .getStartTimestamp(); *************** *** 1389,1396 **** throw new IllegalArgumentException("No such tx: " + ts); ! tx.abort(); } public long commit(long ts) { --- 1403,1427 ---- throw new IllegalArgumentException("No such tx: " + ts); ! if(tx.isReadOnly()) return; ! ! // queue the abort request. ! commitService.submit(new AbortTask(tx)); } + /** + * @todo Implement group commit. large transactions do not get placed into + * commit groups. Group commit collects up to N 'small' transactions + * with at most M milliseconds of latency and perform a single atomic + * commit for that group of transactions, forcing the data to disk and + * then returning control to the clients. Note that we need to collect + * transactions once they have completed but not yet validated. Any + * transaction in the group that fails validation must be aborted. + * Validation needs to be against then current state of the database, + * so the process is serialized (validate+commit) with transactions + * that fail validation being placed onto an abort queue. A single + * unisolated commit is then performed for the group, so there is one + * SYNC per commit. + */ public long commit(long ts) { *************** *** 1400,1403 **** --- 1431,1444 ---- throw new IllegalArgumentException("No such tx: " + ts); + /* + * A read-only transaction can commit immediately since validation and + * commit are basically NOPs. + * + * @todo We could also detect transactions with empty write sets (no + * touched indices) and shortcut the prepare/commit for those + * transactions as well. The easy way to do this is to rangeCount each + * index isolated by the transaction. + */ + if(tx.isReadOnly()) { *************** *** 1472,1475 **** --- 1513,1549 ---- /** + * Task aborts a transaction when it is run by the + * {@link Journal#commitService}. This is used to serialize some abort + * processing which would otherwise be concurrent with commit processing. + * {@link Journal#abort()} makes some assumptions that it is a + * single-threaded environment and those assumptions would be violated + * without serialization aborts and commits on the same queue. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ + private static class AbortTask implements Callable<Long> { + + private final ITx tx; + + public AbortTask(ITx tx) { + + assert tx != null; + + this.tx = tx; + + } + + public Long call() throws Exception { + + tx.abort(); + + return 0L; + + } + + } + + /** * Notify the journal that a new transaction is being activated (starting on * the journal). *************** *** 1556,1560 **** * Lookup an active or prepared transaction (exact match). * ! * @param startTimestamp * The start timestamp for the transaction. * --- 1630,1634 ---- * Lookup an active or prepared transaction (exact match). * ! * @param startTime * The start timestamp for the transaction. * *************** *** 1563,1573 **** * transaction. */ ! public ITx getTx(long startTimestamp) { ! ITx tx = activeTx.get(startTimestamp); if (tx == null) { ! tx = preparedTx.get(startTimestamp); } --- 1637,1647 ---- * transaction. */ ! public ITx getTx(long startTime) { ! ITx tx = activeTx.get(startTime); if (tx == null) { ! tx = preparedTx.get(startTime); } --- NEW FILE: TemporaryRawStore.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 15, 2007 */ package com.bigdata.journal; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; /** * A non-restart-safe store for temporary data that buffers data in memory until * a maximum capacity has been reached and then converts to a disk-based store * with a maximum capacity of 2G. The maximum capacity constraint is imposed by * {@link Addr}. On conversion to a disk-backed store, the disk file is created * using the temporary file mechansism and is marked for eventual deletion no * later than when the JVM exits and as soon as the store is {@link #close()}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo The {@link TemporaryRawStore} would benefit from any caching or AIO * solutions developed for the {@link DiskOnlyStrategy}. * * @todo The temporary store does NOT currently support MROW since it does not * lock out readers when converting from a {@link TransientBufferStrategy} * to a {@link DiskOnlyStrategy}. At this time, MROW is not required for * the {@link TemporaryRawStore} since it is used by a single-threaded * process such as a transaction and not by multiple threads (unlike the * Journal which does support MROW). */ public class TemporaryRawStore implements IRawStore { protected final static int DEFAULT_INITIAL_IN_MEMORY_EXTENT = Bytes.megabyte32 * 10; protected final static int DEFAULT_MAXIMUM_IN_MEMORY_EXTENT = Bytes.megabyte32 * 100; /** * The initial size of the in-memory buffer. This buffer will grow as * necessary until {@link #maximumInMemoryExtent} at which point the store * will convert over to a disk-based mechanism. */ public final long initialInMemoryExtent; /** * The maximum capacity of the in-memory buffer. */ public final long maximumInMemoryExtent; /** * Whether or not a direct buffer was used for the in-memory store (not * recommended). */ public final boolean useDirectBuffers; private boolean open = true; private IBufferStrategy buf; public IBufferStrategy getBufferStrategy() { return buf; } /** * Create a {@link TemporaryRawStore} with an initial in-memory capacity of 10M * that will grow up to 100M before converting into a disk-based store * backed by a temporary file. * * @todo the memory growth strategy does not respect the in-memory maximum * without parameterizing and overriding the #of bytes to extent the * store on overflow for {@link TransientBufferStrategy} */ public TemporaryRawStore() { this( DEFAULT_INITIAL_IN_MEMORY_EXTENT, DEFAULT_MAXIMUM_IN_MEMORY_EXTENT, false ); } /** * Create a {@link TemporaryRawStore} with the specified configuration. * * @param initialInMemoryExtent * The initial size of the in-memory buffer. This buffer will * grow as necessary until <i>maximumInMemoryExtent</i> at which * point the store will convert over to a disk-based mechanism. * @param maximumInMemoryExtent * The maximum capacity of the in-memory buffer. The actual * maximum may differ slightly based on the buffer growth policy. * @param useDirectBuffers * Whether or not the in-memory buffer will be direct. The use of * a direct buffer here is NOT recommended. */ public TemporaryRawStore(long initialInMemoryExtent, long maximumInMemoryExtent, boolean useDirectBuffers) { buf = new TransientBufferStrategy(initialInMemoryExtent, maximumInMemoryExtent, useDirectBuffers); this.initialInMemoryExtent = initialInMemoryExtent; this.maximumInMemoryExtent = maximumInMemoryExtent; this.useDirectBuffers = useDirectBuffers; } /** * Close the store and delete the associated file, if any. */ public void close() { if(!open) throw new IllegalStateException(); open = false; buf.close(); buf.deleteFile(); buf = null; } public void force(boolean metadata) { if(!open) throw new IllegalStateException(); buf.force(metadata); } public boolean isOpen() { return open; } /** * Always returns <code>false</code> since the store will be deleted as soon * as it is closed. */ public boolean isStable() { if(!open) throw new IllegalStateException(); return false; } public boolean isFullyBuffered() { if(!open) throw new IllegalStateException(); return buf.isFullyBuffered(); } public ByteBuffer read(long addr) { if(!open) throw new IllegalStateException(); return buf.read(addr); } public long write(ByteBuffer data) { if(!open) throw new IllegalStateException(); try { return buf.write(data); } catch(OverflowException ex) { if(buf instanceof TransientBufferStrategy) { overflowToDisk(); return buf.write(data); } else { throw ex; } } } protected void overflowToDisk() { System.err.println("TemporaryRawStore: overflow to disk; nbytes=" + buf.getNextOffset()); TransientBufferStrategy tmp = (TransientBufferStrategy)buf; int segmentId = 0; File file = null; // request a unique filename. /* * Set the initial extent to be large enough for the root blocks plus * twice the data in the in-memory buffer. */ long initialExtent = FileMetadata.headerSize0 + tmp.getUserExtent() * 2; final boolean create = true; final boolean readOnly = false; // Do not force writes since the store is not restart safe. final ForceEnum forceWrites = ForceEnum.No; /* Create a unique store file and setup the root blocks. The file * will be pre-extended to the requested initialExtent. */ FileMetadata fileMetadata = new FileMetadata(segmentId, file, BufferMode.Disk, useDirectBuffers, initialExtent, create, readOnly, forceWrites); // Mark the file for deletion on exit. fileMetadata.file.deleteOnExit(); // Open the disk-based store file. DiskOnlyStrategy diskBuf = new DiskOnlyStrategy(Bytes.gigabyte * 2, fileMetadata); try { /* * Transfer the data from the in-memory buffer to the disk. The * write begins immediately after the file header. */ // setup the transfer source. ByteBuffer b = tmp.directBuffer; b.limit(tmp.nextOffset); b.position(0); // write the data on the channel. diskBuf.channel.write(b,diskBuf.headerSize); // increment the offset. diskBuf.nextOffset += tmp.nextOffset; } catch(IOException ex) { try { diskBuf.close(); } catch (Throwable ex2) { } throw new RuntimeException(); } this.buf = diskBuf; } } Index: IJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IJournal.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** IJournal.java 17 Feb 2007 21:34:17 -0000 1.5 --- IJournal.java 21 Feb 2007 20:17:21 -0000 1.6 *************** *** 64,68 **** * @version $Id$ */ ! public interface IJournal extends IRawStore, IAtomicStore, IIndexManager { /** --- 64,69 ---- * @version $Id$ */ ! public interface IJournal extends IMROW, IRawStore, IAtomicStore, ! IIndexManager, ITransactionManager { /** Index: CommitRecordIndex.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** CommitRecordIndex.java 17 Feb 2007 21:34:17 -0000 1.1 --- CommitRecordIndex.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 66,69 **** --- 66,72 ---- protected byte[] getKey(long commitTime) { + /* + * Note: The {@link KeyBuilder} is NOT thread-safe + */ return keyBuilder.reset().append(commitTime).getKey(); *************** *** 78,82 **** * that commit timestamp. */ ! public boolean hasTimestamp(long commitTime) { return super.contains(getKey(commitTime)); --- 81,85 ---- * that commit timestamp. */ ! synchronized public boolean hasTimestamp(long commitTime) { return super.contains(getKey(commitTime)); *************** *** 95,99 **** * is no {@link ICommitTimestamp} for that commit time. */ ! public ICommitRecord get(long commitTime) { ICommitRecord commitRecord; --- 98,102 ---- * is no {@link ICommitTimestamp} for that commit time. */ ! synchronized public ICommitRecord get(long commitTime) { ICommitRecord commitRecord; *************** *** 141,145 **** * @see #get(long) */ ! public ICommitRecord find(long timestamp) { final int index = findIndexOf(timestamp); --- 144,148 ---- * @see #get(long) */ ! synchronized public ICommitRecord find(long timestamp) { final int index = findIndexOf(timestamp); *************** *** 170,174 **** * defined. */ ! public int findIndexOf(long timestamp) { int pos = super.indexOf(getKey(timestamp)); --- 173,177 ---- * defined. */ ! synchronized public int findIndexOf(long timestamp) { int pos = super.indexOf(getKey(timestamp)); *************** *** 221,226 **** protected ICommitRecord loadCommitRecord(IRawStore store, long addr) { ! return CommitRecordSerializer.INSTANCE.deserialize(store.read(addr, ! null)); } --- 224,228 ---- protected ICommitRecord loadCommitRecord(IRawStore store, long addr) { ! return CommitRecordSerializer.INSTANCE.deserialize(store.read(addr)); } *************** *** 244,248 **** * if <i>addr</i> is invalid. */ ! public void add(long commitRecordAddr, ICommitRecord commitRecord) { if (commitRecord == null) --- 246,250 ---- * if <i>addr</i> is invalid. */ ! synchronized public void add(long commitRecordAddr, ICommitRecord commitRecord) { if (commitRecord == null) Index: Options.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Options.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** Options.java 15 Feb 2007 14:23:49 -0000 1.8 --- Options.java 21 Feb 2007 20:17:21 -0000 1.9 *************** *** 153,162 **** * stable storage on a commit (default <code>forceMetadata</code>). * <dl> ! * <dt>no</dt> * <dd>This option is useful when the journal is replicated so that we can * always failover to another server having the same data. Unless the file * is replicated or transient, this mode can lead to lost data if there is a * hardware or software failure.</dd> ! * <dt>force</dt> * <dd>Force the journal contents, but not the file metadata, to stable * storage. The precise semantics of this option are dependent on the OS and --- 153,162 ---- * stable storage on a commit (default <code>forceMetadata</code>). * <dl> ! * <dt>No</dt> * <dd>This option is useful when the journal is replicated so that we can * always failover to another server having the same data. Unless the file * is replicated or transient, this mode can lead to lost data if there is a * hardware or software failure.</dd> ! * <dt>Force</dt> * <dd>Force the journal contents, but not the file metadata, to stable * storage. The precise semantics of this option are dependent on the OS and *************** *** 169,173 **** * always a serious loss, but is normally repairable with a file system * repair utility). </dd> ! * <dt>forceMetadata</dt> * <dd>Force the journal contents and the file metadata to stable storage. * This option is the most secure. </dd> --- 169,173 ---- * always a serious loss, but is normally repairable with a file system * repair utility). </dd> ! * <dt>ForceMetadata</dt> * <dd>Force the journal contents and the file metadata to stable storage. * This option is the most secure. </dd> *************** *** 246,250 **** * metadata are forced). */ ! public final static ForceEnum DEFAULT_FORCE_ON_COMMIT = ForceEnum.ForceMetadata; /** --- 246,250 ---- * metadata are forced). */ ! public final static ForceEnum DEFAULT_FORCE_ON_COMMIT = ForceEnum.Force; /** Index: FileMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/FileMetadata.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** FileMetadata.java 17 Feb 2007 21:34:18 -0000 1.12 --- FileMetadata.java 21 Feb 2007 20:17:21 -0000 1.13 *************** *** 408,413 **** switch (bufferMode) { case Direct: { ! // Allocate a direct buffer. ! buffer = ByteBuffer.allocateDirect((int) userExtent); // Setup to read data from file into the buffer. buffer.limit(nextOffset); --- 408,415 ---- switch (bufferMode) { case Direct: { ! // Allocate the buffer buffer. ! buffer = (useDirectBuffers ? ByteBuffer ! .allocateDirect((int) userExtent) : ByteBuffer ! .allocate((int) userExtent)); // Setup to read data from file into the buffer. buffer.limit(nextOffset); *************** *** 531,535 **** case Direct: /* ! * Allocate a direct buffer. * * Note that we do not read in any data since no user data has --- 533,537 ---- case Direct: /* ! * Allocate the buffer. * * Note that we do not read in any data since no user data has *************** *** 537,545 **** * to avoid possible overwrites. */ - // buffer = ByteBuffer.allocateDirect((int) userExtent); buffer = (useDirectBuffers ? ByteBuffer ! .allocateDirect((int)userExtent) ! : ByteBuffer ! .allocate((int)userExtent)); break; case Mapped: --- 539,545 ---- * to avoid possible overwrites. */ buffer = (useDirectBuffers ? ByteBuffer ! .allocateDirect((int) userExtent) : ByteBuffer ! .allocate((int) userExtent)); break; case Mapped: Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.31 retrieving revision 1.32 diff -C2 -d -r1.31 -r1.32 *** Tx.java 19 Feb 2007 19:00:20 -0000 1.31 --- Tx.java 21 Feb 2007 20:17:21 -0000 1.32 *************** *** 57,60 **** --- 57,61 ---- import com.bigdata.objndx.IndexSegment; import com.bigdata.objndx.ReadOnlyIndex; + import com.bigdata.rawstore.Bytes; import com.bigdata.scaleup.MetadataIndex; import com.bigdata.scaleup.PartitionedIndex; *************** *** 72,79 **** * </p> * <p> ! * The write set of a transaction is written onto a {@link TemporaryStore}. * Therefore the size limit on the transaction write set is currently 2G, but ! * the transaction will run in memory up to 100M. The {@link TemporaryStore} is ! * closed and any backing file is deleted as soon as the transaction completes. * </p> * <p> --- 73,81 ---- * </p> * <p> ! * The write set of a transaction is written onto a {@link TemporaryRawStore}. * Therefore the size limit on the transaction write set is currently 2G, but ! * the transaction will run in memory up to 100M. The {@link TemporaryRawStore} ! * is closed and any backing file is deleted as soon as the transaction ! * completes. * </p> * <p> *************** *** 90,93 **** --- 92,101 ---- * @version $Id$ * + * @todo track whether or not the transaction has written any isolated data. do + * this at the same time that I modify the isolated indices use a + * delegation strategy so that I can trap attempts to access an isolated + * index once the transaction is no longer active. define "active" as + * up to the point where a "commit" or "abort" is _requested_ for the tx. + * * @todo Support read-committed transactions (data committed by _other_ * transactions during the transaction will be visible within that *************** *** 123,127 **** * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by startTimestamp into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). --- 131,135 ---- * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by startTime into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). *************** *** 157,171 **** /** ! * The start startTimestamp assigned to this transaction. * <p> ! * Note: Transaction {@link #startTimestamp} and {@link #commitTimestamp}s * are assigned by a global time service. The time service must provide * unique times for transaction start and commit timestamps and for commit * times for unisolated {@link Journal#commit()}s. */ ! final protected long startTimestamp; /** ! * The commit startTimestamp assigned to this transaction. */ private long commitTimestamp; --- 165,179 ---- /** ! * The start startTime assigned to this transaction. * <p> ! * Note: Transaction {@link #startTime} and {@link #commitTimestamp}s * are assigned by a global time service. The time service must provide * unique times for transaction start and commit timestamps and for commit * times for unisolated {@link Journal#commit()}s. */ ! final protected long startTime; /** ! * The commit startTime assigned to this transaction. */ private long commitTimestamp; *************** *** 177,186 **** /** - * The commit counter on the journal as of the time that this transaction - * object was created. - */ - final protected long commitCounter; - - /** * The historical {@link ICommitRecord} choosen as the ground state for this * transaction. All indices isolated by this transaction are isolated as of --- 185,188 ---- *************** *** 192,207 **** /** ! * A store used to hold write sets for the transaction. The same store can ! * be used to buffer resolved write-write conflicts. This uses a ! * {@link TemporaryStore} to avoid issues with maintaining transactions ! * across journal boundaries. This places a limit on transactions of 2G in ! * their serialized write set. Since the indices use a copy-on-write model, ! * the amount of user data can be significantly less due to multiple ! * versions of the same btree nodes. Using smaller branching factors in the ! * isolated index helps significantly to increase the effective utilization ! * of the store since copy-on-write causes fewer bytes to be copied each ! * time it is invoked. */ ! final protected TemporaryStore tmpStore = new TemporaryStore(); /** --- 194,210 ---- /** ! * A store used to hold write sets for read-write transactions (it is null ! * iff the transaction is read-only). The same store can be used to buffer ! * resolved write-write conflicts. This uses a {@link TemporaryRawStore} to ! * avoid issues with maintaining transactions across journal boundaries. ! * This places a limit on transactions of 2G in their serialized write set. ! * <p> ! * Since the indices use a copy-on-write model, the amount of user data can ! * be significantly less due to multiple versions of the same btree nodes. ! * Using smaller branching factors in the isolated index helps significantly ! * to increase the effective utilization of the store since copy-on-write ! * causes fewer bytes to be copied each time it is invoked. */ ! final protected TemporaryRawStore tmpStore; /** *************** *** 215,219 **** public Tx(Journal journal,long timestamp) { ! this(journal,timestamp,false); } --- 218,222 ---- public Tx(Journal journal,long timestamp) { ! this(journal, timestamp, false); } *************** *** 221,234 **** /** * Create a transaction starting the last committed state of the journal as ! * of the specified startTimestamp. * * @param journal * The journal. * ! * @param startTimestamp ! * The startTimestamp, which MUST be assigned consistently based on a * {@link ITimestampService}. Note that a transaction does not * start up on all {@link Journal}s at the same time. Instead, ! * the transaction start startTimestamp is assigned by a centralized * time service and then provided each time a transaction object * must be created for isolated on some {@link Journal}. --- 224,237 ---- /** * Create a transaction starting the last committed state of the journal as ! * of the specified startTime. * * @param journal * The journal. * ! * @param startTime ! * The startTime, which MUST be assigned consistently based on a * {@link ITimestampService}. Note that a transaction does not * start up on all {@link Journal}s at the same time. Instead, ! * the transaction start startTime is assigned by a centralized * time service and then provided each time a transaction object * must be created for isolated on some {@link Journal}. *************** *** 248,265 **** this.journal = journal; ! this.startTimestamp = timestamp; this.readOnly = readOnly; journal.activateTx(this); /* - * Stash the commit counter so that we can figure out if there were - * concurrent transactions and therefore whether or not we need to - * validate the write set. - */ - this.commitCounter = journal.getRootBlockView().getCommitCounter(); - - /* * The commit record serving as the ground state for the indices * isolated by this transaction (MAY be null, in which case the --- 251,267 ---- this.journal = journal; ! this.startTime = timestamp; this.readOnly = readOnly; + + this.tmpStore = readOnly ? null : new TemporaryRawStore( + Bytes.megabyte * 1, // initial in-memory size. + Bytes.megabyte * 10, // maximum in-memory size. + false // do NOT use direct buffers. + ); journal.activateTx(this); /* * The commit record serving as the ground state for the indices * isolated by this transaction (MAY be null, in which case the *************** *** 279,283 **** final public int hashCode() { ! return Long.valueOf(startTimestamp).hashCode(); } --- 281,285 ---- final public int hashCode() { ! return Long.valueOf(startTime).hashCode(); } *************** *** 291,295 **** final public boolean equals(ITx o) { ! return this == o || startTimestamp == o.getStartTimestamp(); } --- 293,297 ---- final public boolean equals(ITx o) { ! return this == o || startTime == o.getStartTimestamp(); } *************** *** 301,310 **** * * @todo verify that this has the semantics of the transaction start time ! * and that the startTimestamp is (must be) assigned by the same service * that assigns the {@link #getCommitTimestamp()}. */ final public long getStartTimestamp() { ! return startTimestamp; } --- 303,312 ---- * * @todo verify that this has the semantics of the transaction start time ! * and that the startTime is (must be) assigned by the same service * that assigns the {@link #getCommitTimestamp()}. */ final public long getStartTimestamp() { ! return startTime; } *************** *** 343,347 **** public String toString() { ! return ""+startTimestamp; } --- 345,349 ---- public String toString() { ! return ""+startTime; } *************** *** 362,371 **** /* ! * Close and delete the TemporaryStore. */ ! if (tmpStore.isOpen()) { tmpStore.close(); ! } --- 364,373 ---- /* ! * Close and delete the TemporaryRawStore. */ ! if (tmpStore != null && tmpStore.isOpen()) { tmpStore.close(); ! } *************** *** 396,400 **** /* ! * The commit startTimestamp is assigned when we prepare the transaction * since the the commit protocol does not permit unisolated writes * once a transaction begins to prepar until the transaction has --- 398,402 ---- /* ! * The commit startTime is assigned when we prepare the transaction * since the the commit protocol does not permit unisolated writes * once a transaction begins to prepar until the transaction has *************** *** 415,419 **** */ ! if (!validate()) { abort(); --- 417,421 ---- */ ! if (!validateWriteSets()) { abort(); *************** *** 641,645 **** * Validate all isolated btrees written on by this transaction. */ ! private boolean validate() { assert ! readOnly; --- 643,647 ---- * Validate all isolated btrees written on by this transaction. */ ! private boolean validateWriteSets() { assert ! readOnly; *************** *** 647,656 **** /* * This compares the current commit counter on the journal with the ! * commit counter at the time that this transaction was started. If they ! * are the same, then no intervening commits have occurred on the ! * journal and there is nothing to validate. */ ! if(journal.getRootBlockView().getCommitCounter() == commitCounter) { return true; --- 649,660 ---- /* * This compares the current commit counter on the journal with the ! * commit counter as of the start time for the transaction. If they are ! * the same, then no intervening commits have occurred on the journal ! * and there is nothing to validate. */ ! if (commitRecord == null ! || (journal.getRootBlockView().getCommitCounter() == commitRecord ! .getCommitCounter())) { return true; Index: TemporaryStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TemporaryStore.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** TemporaryStore.java 19 Feb 2007 19:00:23 -0000 1.5 --- TemporaryStore.java 21 Feb 2007 20:17:21 -0000 1.6 *************** *** 43,286 **** */ /* ! * Created on Feb 15, 2007 */ package com.bigdata.journal; - import java.io.File; - import java.io.IOException; - import java.nio.ByteBuffer; - import com.bigdata.objndx.IIndex; import com.bigdata.rawstore.Addr; - import com.bigdata.rawstore.Bytes; - import com.bigdata.rawstore.IRawStore; /** ! * A non-restart-safe store for temporary data that buffers data in memory until ! * a maximum capacity has been reached and then converts to a disk-based store ! * with a maximum capacity of 2G. The maximum capacity constraint is imposed by ! * {@link Addr}. On conversion to a disk-backed store, the disk file is created ! * using the temporary file mechansism and is marked for eventual deletion no ! * later than when the JVM exits and as soon as the store is {@link #close()}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo The {@link TemporaryStore} would benefit from any caching or AIO - * solutions developed for the {@link DiskOnlyStrategy}. */ ! public class TemporaryStore implements IRawStore, IStore { ! ! /** ! * The initial size of the in-memory buffer. This buffer will grow as ! * necessary until {@link #maximumInMemoryExtent} at which point the store ! * will convert over to a disk-based mechanism. ! */ ! public final long initialInMemoryExtent; ! ! /** ! * The maximum capacity of the in-memory buffer. ! */ ! public final long maximumInMemoryExtent; ! ! /** ! * Whether or not a direct buffer was used for the in-memory store (not ! * recommended). ! */ ! public final boolean useDirectBuffers; ! ! private boolean open = true; ! private IBufferStrategy buf; ! ! public IBufferStrategy getBufferStrategy() { ! ! return buf; ! ! } /** ! * Create a {@link TemporaryStore} with an initial in-memory capacity of 10M ! * that will grow up to 100M before converting into a disk-based store ! * backed by a temporary file. */ public TemporaryStore() { ! this( Bytes.megabyte*10, Bytes.megabyte*100, false ); } /** - * Create a {@link TemporaryStore} with the specified configuration. - * * @param initialInMemoryExtent - * The initial size of the in-memory buffer. This buffer will - * grow as necessary until <i>maximumInMemoryExtent</i> at which - * point the store will convert over to a disk-based mechanism. * @param maximumInMemoryExtent - * The maximum capacity of the in-memory buffer. The actual - * maximum may differ slightly based on the buffer growth policy. * @param useDirectBuffers - * Whether or not the in-memory buffer will be direct. The use of - * a direct buffer here is NOT recommended. */ public TemporaryStore(long initialInMemoryExtent, long maximumInMemoryExtent, boolean useDirectBuffers) { - - buf = new TransientBufferStrategy(initialInMemoryExtent, maximumInMemoryExtent, - useDirectBuffers); - - this.initialInMemoryExtent = initialInMemoryExtent; - - this.maximumInMemoryExtent = maximumInMemoryExtent; - - this.useDirectBuffers = useDirectBuffers; - - setupName2AddrBTree(); - - } ! /** ! * Close the store and delete the associated file, if any. ! */ ! public void close() { ! ! if(!open) throw new IllegalStateException(); ! ! open = false; ! ! buf.close(); ! ! buf.deleteFile(); ! ! } ! ! public void force(boolean metadata) { ! ! if(!open) throw new IllegalStateException(); ! ! buf.force(metadata); ! ! } ! ! public boolean isOpen() { ! ! return open; ! ! } ! ! /** ! * Always returns <code>false</code> since the store will be deleted as soon ! * as it is closed. ! */ ! public boolean isStable() { ! return false; ! } ! ! public ByteBuffer read(long addr, ByteBuffer dst) { ! ! if(!open) throw new IllegalStateException(); ! ! return buf.read(addr, dst); ! ! } ! ! public long write(ByteBuffer data) { ! ! if(!open) throw new IllegalStateException(); ! ! try { ! ! return buf.write(data); ! ! } catch(OverflowException ex) { ! ! if(buf instanceof TransientBufferStrategy) { ! ! overflowToDisk(); ! ! return buf.write(data); ! ! } else { ! ! throw ex; ! ! } ! ! } ! ! } ! ! protected void overflowToDisk() { ! ! TransientBufferStrategy tmp = (TransientBufferStrategy)buf; ! ! int segmentId = 0; ! ! File file = null; // request a unique filename. ! ! /* ! * Set the initial extent to be large enough for the root blocks plus ! * twice the data in the in-memory buffer. ! */ ! long initialExtent = FileMetadata.headerSize0 + tmp.getUserExtent() * 2; ! ! final boolean create = true; ! ! final boolean readOnly = false; ! ! // Do not force writes since the store is not restart safe. ! final ForceEnum forceWrites = ForceEnum.No; ! ! /* Create a unique store file and setup the root blocks. The file ! * will be pre-extended to the requested initialExtent. ! */ ! FileMetadata fileMetadata = new FileMetadata(segmentId, file, ! BufferMode.Disk, useDirectBuffers, initialExtent, ! create, readOnly, forceWrites); ! ! // Mark the file for deletion on exit. ! fileMetadata.file.deleteOnExit(); ! ! // Open the disk-based store file. ! DiskOnlyStrategy diskBuf = new DiskOnlyStrategy(Bytes.gigabyte * 2, ! fileMetadata); ! ! try { ! /* ! * Transfer the data from the in-memory buffer to the disk. The ! * write begins immediately after the file header. ! */ ! ! // setup the transfer source. ! ByteBuffer b = tmp.directBuffer; ! b.limit(tmp.nextOffset); ! b.position(0); ! ! ... [truncated message content] |
From: Bryan T. <tho...@us...> - 2007-02-21 20:17:34
|
Update of /cvsroot/cweb/bigdata/src/architecture In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5461/src/architecture Modified Files: segment math.xls Log Message: Further work supporting transactional isolation. Index: segment math.xls =================================================================== RCS file: /cvsroot/cweb/bigdata/src/architecture/segment math.xls,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 Binary files /tmp/cvs7NXqkn and /tmp/cvs359hFH differ |
From: Bryan T. <tho...@us...> - 2007-02-21 20:17:34
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5461/src/java/com/bigdata/isolation Modified Files: UnisolatedBTree.java Log Message: Further work supporting transactional isolation. Index: UnisolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/UnisolatedBTree.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** UnisolatedBTree.java 19 Feb 2007 01:05:39 -0000 1.6 --- UnisolatedBTree.java 21 Feb 2007 20:17:22 -0000 1.7 *************** *** 117,121 **** * performance. */ ! public static final int DEFAULT_BRANCHING_FACTOR = 32; /** --- 117,121 ---- * performance. */ ! public static final int DEFAULT_BRANCHING_FACTOR = 16; /** |
From: Bryan T. <tho...@us...> - 2007-02-21 20:17:34
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/rawstore In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5461/src/java/com/bigdata/rawstore Modified Files: SimpleMemoryRawStore.java IRawStore.java SimpleFileRawStore.java Log Message: Further work supporting transactional isolation. Index: SimpleFileRawStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/rawstore/SimpleFileRawStore.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** SimpleFileRawStore.java 15 Feb 2007 20:59:21 -0000 1.3 --- SimpleFileRawStore.java 21 Feb 2007 20:17:21 -0000 1.4 *************** *** 55,59 **** import java.util.Set; ! import com.bigdata.journal.TemporaryStore; --- 55,59 ---- import java.util.Set; ! import com.bigdata.journal.TemporaryRawStore; *************** *** 63,67 **** * must code the offset into the file using {@link Addr#toLong(int, int)}. * ! * @see {@link TemporaryStore}, which provides a more solution for temporary * data that begins with the benefits of a memory-resident buffer and then * converts to a disk-based store on overflow. --- 63,67 ---- * must code the offset into the file using {@link Addr#toLong(int, int)}. * ! * @see {@link TemporaryRawStore}, which provides a more solution for temporary * data that begins with the benefits of a memory-resident buffer and then * converts to a disk-based store on overflow. *************** *** 128,131 **** --- 128,137 ---- } + public boolean isFullyBuffered() { + + return false; + + } + /** * This also releases the lock if any obtained by the constructor. *************** *** 149,153 **** } ! public ByteBuffer read(long addr, ByteBuffer dst) { if (addr == 0L) --- 155,159 ---- } ! public ByteBuffer read(long addr) { if (addr == 0L) *************** *** 165,172 **** } ! if(deleted.contains(addr)) { ! ! throw new IllegalArgumentException("Address was deleted in this session"); ! } --- 171,179 ---- } ! if (deleted.contains(addr)) { ! ! throw new IllegalArgumentException( ! "Address was deleted in this session"); ! } *************** *** 179,219 **** } ! if (dst != null && dst.remaining() >= nbytes) { ! ! // copy exactly this many bytes. ! ! dst.limit(dst.position() + nbytes); ! ! // copy into the caller's buffer. ! ! raf.getChannel().read(dst,(long)offset); ! ! // flip for reading. ! ! dst.flip(); ! ! // the caller's buffer. ! ! return dst; ! ! } else { ! ! // allocate a new buffer of the exact capacity. ! ! dst = ByteBuffer.allocate(nbytes); ! // copy the data into the buffer. ! raf.getChannel().read(dst,(long)offset); ! // flip for reading. ! dst.flip(); ! // return the buffer. ! return dst; ! } } catch (IOException ex) { --- 186,204 ---- } ! // allocate a new buffer of the exact capacity. ! ByteBuffer dst = ByteBuffer.allocate(nbytes); ! // copy the data into the buffer. ! raf.getChannel().read(dst, (long) offset); ! // flip for reading. ! dst.flip(); ! // return the buffer. ! return dst; } catch (IOException ex) { Index: IRawStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/rawstore/IRawStore.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** IRawStore.java 12 Feb 2007 21:51:08 -0000 1.3 --- IRawStore.java 21 Feb 2007 20:17:21 -0000 1.4 *************** *** 150,169 **** * offset from which the data will be read and the #of bytes to * be read. - * @param dst - * The destination buffer (optional). This buffer will be used if - * it has sufficient {@link ByteBuffer#remaining()} capacity. - * When used, the data will be written into the offered buffer - * starting at {@link ByteBuffer#position()} and the - * {@link ByteBuffer#position()} will be advanced by the #of - * bytes read.<br> - * When <code>null</code> or when a buffer is offered without - * sufficient {@link ByteBuffer#remaining()} capacity the - * implementation is encouraged to return a read-only slice - * containing the data or, failing that, to allocate a new buffer - * with sufficient capacity.<br> - * Note that it is not an error to offer a buffer that is too - * small - it will simply be ignored.<br> - * Note that it is not an error to offer a buffer with excess - * remaining capacity. * * @return The data read. The buffer will be flipped to prepare for reading --- 150,153 ---- *************** *** 175,179 **** * deleted). Note that the address 0L is always invalid. */ ! public ByteBuffer read(long addr, ByteBuffer dst); /** --- 159,197 ---- * deleted). Note that the address 0L is always invalid. */ ! public ByteBuffer read(long addr); ! ! // /** ! // * Read the data (unisolated). ! // * ! // * @param addr ! // * A long integer formed using {@link Addr} that encodes both the ! // * offset from which the data will be read and the #of bytes to ! // * be read. ! // * @param dst ! // * The destination buffer (optional). This buffer will be used if ! // * it has sufficient {@link ByteBuffer#remaining()} capacity. ! // * When used, the data will be written into the offered buffer ! // * starting at {@link ByteBuffer#position()} and the ! // * {@link ByteBuffer#position()} will be advanced by the #of ! // * bytes read.<br> ! // * When <code>null</code> or when a buffer is offered without ! // * sufficient {@link ByteBuffer#remaining()} capacity the ! // * implementation is encouraged to return a read-only slice ! // * containing the data or, failing that, to allocate a new buffer ! // * with sufficient capacity.<br> ! // * Note that it is not an error to offer a buffer that is too ! // * small - it will simply be ignored.<br> ! // * Note that it is not an error to offer a buffer with excess ! // * remaining capacity. ! // * ! // * @return The data read. The buffer will be flipped to prepare for reading ! // * (the position will be zero and the limit will be the #of bytes ! // * read). ! // * ! // * @exception IllegalArgumentException ! // * If the address is known to be invalid (never written or ! // * deleted). Note that the address 0L is always invalid. ! // */ ! // public ByteBuffer read(long addr, ByteBuffer dst); /** *************** *** 194,199 **** --- 212,234 ---- /** * True iff backed by stable storage. + * + * @exception IllegalStateException + * if the store is not open. */ public boolean isStable(); + + /** + * True iff the store is fully buffered (all reads are against memory). + * Implementations MAY change the value returned by this method over the + * life cycle of the store, e.g., to conserve memory a store may drop or + * decrease its buffer if it is backed by disk. + * <p> + * Note: This does not guarentee that the OS will not swap the buffer onto + * disk. + * + * @exception IllegalStateException + * if the store is not open. + */ + public boolean isFullyBuffered(); /** Index: SimpleMemoryRawStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/rawstore/SimpleMemoryRawStore.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** SimpleMemoryRawStore.java 15 Feb 2007 20:59:21 -0000 1.2 --- SimpleMemoryRawStore.java 21 Feb 2007 20:17:21 -0000 1.3 *************** *** 53,57 **** import java.util.Map; ! import com.bigdata.journal.TemporaryStore; /** --- 53,57 ---- import java.util.Map; ! import com.bigdata.journal.TemporaryRawStore; /** *************** *** 59,63 **** * buffered in memory. The writes are stored in an {@link ArrayList}. * ! * @see {@link TemporaryStore}, which provides a more scalable solution for temporary * data. * --- 59,63 ---- * buffered in memory. The writes are stored in an {@link ArrayList}. * ! * @see {@link TemporaryRawStore}, which provides a more scalable solution for temporary * data. * *************** *** 130,133 **** --- 130,139 ---- } + public boolean isFullyBuffered() { + + return true; + + } + public void close() { *************** *** 141,145 **** } ! public ByteBuffer read(long addr, ByteBuffer dst) { if (addr == 0L) --- 147,151 ---- } ! public ByteBuffer read(long addr) { if (addr == 0L) *************** *** 179,207 **** } ! if(dst != null && dst.remaining()>=nbytes) { ! ! // place limit on the #of valid bytes in dst. ! ! dst.limit(dst.position() + nbytes); ! ! // copy into the caller's buffer. ! ! dst.put(b,0,b.length); ! ! // flip for reading. ! ! dst.flip(); ! ! // the caller's buffer. ! ! return dst; ! ! } else { ! ! // return a read-only view onto the data in the store. ! ! return ByteBuffer.wrap(b).asReadOnlyBuffer(); ! } } --- 185,191 ---- } ! // return a read-only view onto the data in the store. ! return ByteBuffer.wrap(b).asReadOnlyBuffer(); } |
From: Bryan T. <tho...@us...> - 2007-02-21 20:17:33
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5461/src/java/com/bigdata/objndx Modified Files: IndexSegmentBuilder.java BTree.java IndexSegmentFileStore.java AbstractBTree.java IndexSegmentMerger.java NodeSerializer.java BTreeMetadata.java Log Message: Further work supporting transactional isolation. Index: IndexSegmentFileStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentFileStore.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** IndexSegmentFileStore.java 12 Feb 2007 21:51:01 -0000 1.6 --- IndexSegmentFileStore.java 21 Feb 2007 20:17:21 -0000 1.7 *************** *** 117,120 **** --- 117,126 ---- } + public boolean isFullyBuffered() { + + return false; + + } + public void close() { *************** *** 153,157 **** * buffer. Otherwise this reads through to the backing file. */ ! public ByteBuffer read(long addr, ByteBuffer dst) { if (!open) --- 159,163 ---- * buffer. Otherwise this reads through to the backing file. */ ! public ByteBuffer read(long addr) { if (!open) *************** *** 164,167 **** --- 170,175 ---- final int offsetNodes = Addr.getOffset(metadata.addrNodes); + ByteBuffer dst; + if (offset >= offsetNodes && buf_nodes != null) { *************** *** 189,200 **** } else { ! /* ! * Allocate if not provided by the caller. ! */ ! if (dst == null) { ! ! dst = ByteBuffer.allocate(length); ! ! } /* --- 197,202 ---- } else { ! // Allocate buffer. ! dst = ByteBuffer.allocate(length); /* Index: IndexSegmentBuilder.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentBuilder.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** IndexSegmentBuilder.java 15 Feb 2007 22:01:18 -0000 1.24 --- IndexSegmentBuilder.java 21 Feb 2007 20:17:21 -0000 1.25 *************** *** 64,68 **** import com.bigdata.journal.Journal; ! import com.bigdata.journal.TemporaryStore; import com.bigdata.objndx.IndexSegment.CustomAddressSerializer; import com.bigdata.rawstore.Addr; --- 64,68 ---- import com.bigdata.journal.Journal; ! import com.bigdata.journal.TemporaryRawStore; import com.bigdata.objndx.IndexSegment.CustomAddressSerializer; import com.bigdata.rawstore.Addr; *************** *** 164,168 **** * a region of the {@link #outFile}. */ ! protected TemporaryStore leafBuffer; /** --- 164,168 ---- * a region of the {@link #outFile}. */ ! protected TemporaryRawStore leafBuffer; /** *************** *** 170,174 **** * a region of the {@link #outFile}. */ ! protected TemporaryStore nodeBuffer; /** --- 170,174 ---- * a region of the {@link #outFile}. */ ! protected TemporaryRawStore nodeBuffer; /** *************** *** 348,352 **** * * @todo make checksum, and record compression parameters in this ! * constructor variant * * FIXME test with and without each of these options { useChecksum, --- 348,352 ---- * * @todo make checksum, and record compression parameters in this ! * constructor variant. * * FIXME test with and without each of these options { useChecksum, *************** *** 358,362 **** this(outFile, tmpDir, btree.getEntryCount(), btree.entryIterator(), m, ! btree.nodeSer.valueSerializer, false/* useChecksum */, null/* new RecordCompressor() */, errorRate); --- 358,362 ---- this(outFile, tmpDir, btree.getEntryCount(), btree.entryIterator(), m, ! btree.nodeSer.valueSerializer, true/* useChecksum */, null/* new RecordCompressor() */, errorRate); *************** *** 387,391 **** * Used to serialize values in the new {@link IndexSegment}. * @param useChecksum ! * whether or not checksums are computed for nodes and leaves. * @param recordCompressor * An object to compress leaves and nodes or <code>null</code> --- 387,394 ---- * Used to serialize values in the new {@link IndexSegment}. * @param useChecksum ! * Whether or not checksums are computed for nodes and leaves. ! * The use of checksums on the read-only indices provides a check ! * for corrupt media and definately makes the database more ! * robust. * @param recordCompressor * An object to compress leaves and nodes or <code>null</code> *************** *** 549,553 **** * index build operation. */ ! leafBuffer = new TemporaryStore(); /* --- 552,556 ---- * index build operation. */ ! leafBuffer = new TemporaryRawStore(); /* *************** *** 559,563 **** * abstraction for a disk file. */ ! nodeBuffer = plan.nnodes > 0 ? new TemporaryStore() : null; /* --- 562,566 ---- * abstraction for a disk file. */ ! nodeBuffer = plan.nnodes > 0 ? new TemporaryRawStore() : null; /* Index: BTreeMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/BTreeMetadata.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** BTreeMetadata.java 17 Feb 2007 21:34:18 -0000 1.10 --- BTreeMetadata.java 21 Feb 2007 20:17:21 -0000 1.11 *************** *** 149,154 **** public static BTreeMetadata read(IRawStore store, long addr) { ! return (BTreeMetadata) SerializerUtil.deserialize(store ! .read(addr, null)); } --- 149,153 ---- public static BTreeMetadata read(IRawStore store, long addr) { ! return (BTreeMetadata) SerializerUtil.deserialize(store.read(addr)); } Index: BTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/BTree.java,v retrieving revision 1.34 retrieving revision 1.35 diff -C2 -d -r1.34 -r1.35 *** BTree.java 17 Feb 2007 21:34:21 -0000 1.34 --- BTree.java 21 Feb 2007 20:17:21 -0000 1.35 *************** *** 409,413 **** hardReferenceQueue, PackedAddressSerializer.INSTANCE, valueSer, ! NodeFactory.INSTANCE, recordCompressor, true /* useChecksum */); /* --- 409,417 ---- hardReferenceQueue, PackedAddressSerializer.INSTANCE, valueSer, ! NodeFactory.INSTANCE, // ! recordCompressor, // ! // FIXME only use checksum for stores that are not fully buffered. ! true || !store.isFullyBuffered()/* useChecksum */ ! ); /* *************** *** 453,457 **** PackedAddressSerializer.INSTANCE, metadata.valueSer, NodeFactory.INSTANCE, ! metadata.recordCompressor, metadata.useChecksum); // save a reference to the immutable metadata record. --- 457,463 ---- PackedAddressSerializer.INSTANCE, metadata.valueSer, NodeFactory.INSTANCE, ! metadata.recordCompressor,// ! metadata.useChecksum // use checksum iff used on create. ! ); // save a reference to the immutable metadata record. Index: AbstractBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/AbstractBTree.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** AbstractBTree.java 15 Feb 2007 14:23:49 -0000 1.16 --- AbstractBTree.java 21 Feb 2007 20:17:21 -0000 1.17 *************** *** 133,137 **** * flag turns on some more expensive assertions. */ ! final protected boolean debug = false; /** --- 133,137 ---- * flag turns on some more expensive assertions. */ ! final protected boolean debug = DEBUG||true; /** *************** *** 163,179 **** /** ! * Leaves are added to a hard reference queue when they are created or read ! * from the store. On eviction from the queue the leaf is serialized by a ! * listener against the {@link IRawStore}. Once the leaf is no longer ! * strongly reachable its weak references may be cleared by the VM. Note ! * that leaves are evicted as new leaves are added to the hard reference ! * queue. This occurs in two situations: (1) when a new leaf is created ! * during a split of an existing leaf; and (2) when a leaf is read in from ! * the store. The minimum capacity for the hard reference queue is two (2) ! * so that a split may occur without forcing eviction of either leaf in the * split. Incremental writes basically make it impossible for the commit IO * to get "too large" where too large is defined by the size of the hard ! * reference cache. ! * * Note: The code in {@link Node#postOrderIterator(boolean)} and * {@link DirtyChildIterator} MUST NOT touch the hard reference queue since --- 163,183 ---- /** ! * Leaves (and nodes) are added to a hard reference queue when they are ! * created or read from the store. On eviction from the queue a dirty leaf ! * (or node) is serialized by a listener against the {@link IRawStore}. ! * Once the leaf is no longer strongly reachable its weak references may be ! * cleared by the VM. ! * <p> ! * Note that leaves (and nodes) are evicted as new leaves (or nodes) are ! * added to the hard reference queue. This occurs in two situations: (1) ! * when a new leaf (or node) is created during a split of an existing leaf ! * (or node); and (2) when a leaf (or node) is read in from the store. ! * <p> ! * The minimum capacity for the hard reference queue is two (2) so that a ! * split may occur without forcing eviction of either leaf (or node) in the * split. Incremental writes basically make it impossible for the commit IO * to get "too large" where too large is defined by the size of the hard ! * reference cache and help to ensure fast commit operations on the store. ! * <p> * Note: The code in {@link Node#postOrderIterator(boolean)} and * {@link DirtyChildIterator} MUST NOT touch the hard reference queue since *************** *** 198,202 **** * a fixed memory burden for the index? As it stands the #of nodes and * the #of leaves in memory can vary and leaves require much more ! * memory than nodes (for most trees). */ final protected HardReferenceQueue<PO> leafQueue; --- 202,217 ---- * a fixed memory burden for the index? As it stands the #of nodes and * the #of leaves in memory can vary and leaves require much more ! * memory than nodes (for most trees). (As an alternative, allow a ! * btree to retain some #of levels of the nodes in memory using a ! * separate node cache.) ! * ! * FIXME Verify that memory allocated for leaves or nodes on the queue is ! * reclaimed when copy-on-write is triggered since those data are no longer ! * reachable by this instance of the btree. This is essentially a memory ! * leak. Note that we can not just clear the hard reference on the queue, ! * but we can release the keys and values for the node, which constitute ! * most of its state. The node will already be marked as "!dirty" since copy ! * on write was triggered, so it will NOT be serialized when it is evicted ! * from the hard reference queue. */ final protected HardReferenceQueue<PO> leafQueue; *************** *** 978,982 **** * identity assigned to the node by the store. This method is NOT recursive * and dirty children of a node will NOT be visited. ! * * Note: This will throw an exception if the backing store is read-only. * --- 993,997 ---- * identity assigned to the node by the store. This method is NOT recursive * and dirty children of a node will NOT be visited. ! * <p> * Note: This will throw an exception if the backing store is read-only. * *************** *** 1026,1039 **** } - // /* - // * Convert the keys buffer to an immutable keys buffer. The immutable - // * keys buffer is potentially much more compact. - // */ - // if( node.keys instanceof MutableKeyBuffer ) { - // - // node.keys = new ImmutableKeyBuffer((MutableKeyBuffer)node.keys); - // - // } - /* * Serialize the node or leaf onto a shared buffer. --- 1041,1044 ---- *************** *** 1099,1109 **** protected AbstractNode readNodeOrLeaf(long addr) { ! /* ! * offer the node serializer's buffer to the IRawStore. it will be used ! * iff it is large enough and the store does not prefer to return a ! * read-only slice. ! */ ! // nodeSer.buf.clear(); ! ByteBuffer tmp = store.read(addr, nodeSer._buf); assert tmp.position() == 0; assert tmp.limit() == Addr.getByteCount(addr); --- 1104,1114 ---- protected AbstractNode readNodeOrLeaf(long addr) { ! // /* ! // * offer the node serializer's buffer to the IRawStore. it will be used ! // * iff it is large enough and the store does not prefer to return a ! // * read-only slice. ! // */ ! // ByteBuffer tmp = store.read(addr, nodeSer._buf); ! ByteBuffer tmp = store.read(addr); assert tmp.position() == 0; assert tmp.limit() == Addr.getByteCount(addr); Index: NodeSerializer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/NodeSerializer.java,v retrieving revision 1.31 retrieving revision 1.32 diff -C2 -d -r1.31 -r1.32 *** NodeSerializer.java 8 Feb 2007 21:32:13 -0000 1.31 --- NodeSerializer.java 21 Feb 2007 20:17:21 -0000 1.32 *************** *** 310,317 **** /** ! * The object index is used in a single threaded context. Therefore a ! * single private instance is used to compute checksums. */ ! private static final ChecksumUtility chk = new ChecksumUtility(); public IValueSerializer getValueSerializer() { --- 310,318 ---- /** ! * A private instance is used to compute checksums for each ! * {@link AbstractBTree}. This makes is possible to have concurrent reads ! * or writes on multiple btrees that are backed by different stores. */ ! private final ChecksumUtility chk; public IValueSerializer getValueSerializer() { *************** *** 405,408 **** --- 406,411 ---- this.useChecksum = useChecksum; + + this.chk = useChecksum ? new ChecksumUtility() : null; if (initialBufferCapacity == 0) { *************** *** 420,431 **** * FIXME The capacity of this buffer is a SWAG. If it is too small then * an EOFException will be thrown. This needs to be modified start with ! * a smaller buffer and grow as required. An alternative would be to * re-allocate this whenever _buf is resize since the compressed data * should never be larger than the original data. */ cbuf = recordCompressor != null // ! // ? ByteBuffer.allocate(Bytes.megabyte32) // ! ? ByteBuffer.allocateDirect(Bytes.megabyte32*2) // : null; --- 423,438 ---- * FIXME The capacity of this buffer is a SWAG. If it is too small then * an EOFException will be thrown. This needs to be modified start with ! * a smaller buffer and grow as required. An alternative would be to * re-allocate this whenever _buf is resize since the compressed data * should never be larger than the original data. + * + * @todo consider discarding [buf] and [cbuf] if the node serializer + * becomes inactive in order to minimize memory use. they can be + * reallocated as necesssary. */ cbuf = recordCompressor != null // ! ? ByteBuffer.allocate(Bytes.megabyte32) // ! // ? ByteBuffer.allocateDirect(Bytes.megabyte32*2) // : null; *************** *** 542,546 **** /* * @todo it is extremely weird, but this assertion (and the parallel ! * one below) trips during the TestBTreeWithJournal stress tests. * this is odd because the code explicitly resets the position of * the buffer that it is manipulating as its last step in getNode() --- 549,553 ---- /* * @todo it is extremely weird, but this assertion (and the parallel ! * one below) trips during the AbstractBTreeWithJournalTestCase stress tests. * this is odd because the code explicitly resets the position of * the buffer that it is manipulating as its last step in getNode() Index: IndexSegmentMerger.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentMerger.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** IndexSegmentMerger.java 9 Feb 2007 18:56:58 -0000 1.8 --- IndexSegmentMerger.java 21 Feb 2007 20:17:21 -0000 1.9 *************** *** 195,199 **** in1.nodeSer.keySerializer, in1.nodeSer.valueSerializer, ! new RecordCompressor(), useChecksum ); --- 195,199 ---- in1.nodeSer.keySerializer, in1.nodeSer.valueSerializer, ! null, //new RecordCompressor(), useChecksum ); |
From: Bryan T. <tho...@us...> - 2007-02-21 20:16:50
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5442/src/java/com/bigdata/rdf Modified Files: AutoIncCounter.java TripleStore.java Log Message: Further work supporting transactional isolation. Index: AutoIncCounter.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/AutoIncCounter.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** AutoIncCounter.java 11 Feb 2007 17:34:22 -0000 1.3 --- AutoIncCounter.java 21 Feb 2007 20:16:43 -0000 1.4 *************** *** 141,145 **** try { ! ByteBuffer buf = store.read(addr, null); ByteBufferInputStream bbis = new ByteBufferInputStream(buf); --- 141,145 ---- try { ! ByteBuffer buf = store.read(addr); ByteBufferInputStream bbis = new ByteBufferInputStream(buf); Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.18 retrieving revision 1.19 diff -C2 -d -r1.18 -r1.19 *** TripleStore.java 19 Feb 2007 01:05:47 -0000 1.18 --- TripleStore.java 21 Feb 2007 20:16:43 -0000 1.19 *************** *** 94,98 **** * A triple store based on the <em>bigdata</em> architecture. * ! * @todo Refactor to support transactions and concurrent load/query * <p> * Conflicts arise in the bigdata-RDF store when concurrent transactions --- 94,99 ---- * A triple store based on the <em>bigdata</em> architecture. * ! * @todo Refactor to support transactions and concurrent load/query and test ! * same. * <p> * Conflicts arise in the bigdata-RDF store when concurrent transactions |
From: Bryan T. <tho...@us...> - 2007-02-21 20:16:48
|
Update of /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/metrics In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5442/src/test/com/bigdata/rdf/metrics Modified Files: TestMetrics.java Log Message: Further work supporting transactional isolation. Index: TestMetrics.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/test/com/bigdata/rdf/metrics/TestMetrics.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestMetrics.java 11 Feb 2007 17:34:24 -0000 1.2 --- TestMetrics.java 21 Feb 2007 20:16:44 -0000 1.3 *************** *** 87,91 **** * @version $Id$ */ - public class TestMetrics extends AbstractMetricsTestCase { --- 87,90 ---- |
From: Bryan T. <tho...@us...> - 2007-02-21 20:16:39
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/util/concurrent In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5431/src/java/com/bigdata/util/concurrent Log Message: Directory /cvsroot/cweb/bigdata/src/java/com/bigdata/util/concurrent added to the repository |
From: Bryan T. <tho...@us...> - 2007-02-20 00:27:10
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv24314/src/java/com/bigdata/journal Modified Files: TransactionServer.java Journal.java BufferMode.java Added Files: ITransactionManager.java Log Message: Worked through basic serialization of commits. Index: BufferMode.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/BufferMode.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** BufferMode.java 16 Oct 2006 14:04:41 -0000 1.4 --- BufferMode.java 20 Feb 2007 00:27:03 -0000 1.5 *************** *** 28,32 **** * </p> */ ! Transient("transient"), /** --- 28,32 ---- * </p> */ ! Transient("Transient"), /** *************** *** 46,50 **** * </p> */ ! Direct("direct"), /** --- 46,50 ---- * </p> */ ! Direct("Direct"), /** *************** *** 65,69 **** * @see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 */ ! Mapped("mapped"), /** --- 65,69 ---- * @see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 */ ! Mapped("Mapped"), /** *************** *** 77,81 **** * </p> */ ! Disk("disk"); private final String name; --- 77,81 ---- * </p> */ ! Disk("Disk"); private final String name; *************** *** 94,99 **** /** ! * Parse a string whose contents must be "transient", "direct", "mapped", or ! * "disk". * * @param s --- 94,99 ---- /** ! * Parse a string whose contents must be "Transient", "Direct", "Mapped", or ! * "Disk". * * @param s --- NEW FILE: ITransactionManager.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 19, 2007 */ package com.bigdata.journal; import com.bigdata.objndx.IIndex; /** * A client-facing interface for managing transaction life cycles. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface ITransactionManager { /** * Create a new fully-isolated read-write transaction. * * @return The transaction start time, which serves as the unique identifier * for the transaction. */ public long newTx(); /** * Create a new fully-isolated transaction. * * @param readOnly * When true, the transaction will reject writes. * * @return The transaction start time, which serves as the unique identifier * for the transaction. */ public long newTx(boolean readOnly); /** * Create a new read-committed transaction. The transaction will reject * writes. Any data committed by concurrent transactions will become visible * to indices isolated by this transaction (hence, "read comitted"). * <p> * This provides more isolation than "read dirty" since the concurrent * transactions MUST commit before their writes become visible to the a * read-committed transaction. * * @return The transaction start time, which serves as the unique identifier * for the transaction. */ public long newReadCommittedTx(); /** * Return the named index as isolated by the transaction. * * @param name * The index name. * @param ts * The transaction start time, which serves as the unique * identifier for the transaction. * * @return The isolated index. * * @exception IllegalArgumentException * if <i>name</i> is <code>null</code> * * @exception IllegalStateException * if there is no active transaction with that timestamp. */ public IIndex getIndex(String name, long ts); /** * Abort the transaction. * * @param ts * The transaction start time, which serves as the unique * identifier for the transaction. * * @exception IllegalStateException * if there is no active transaction with that timestamp. */ public void abort(long ts); /** * Commit the transaction. * * @param ts * The transaction start time, which serves as the unique * identifier for the transaction. * * @return The commit timestamp assigned to the transaction. * * @exception IllegalStateException * if there is no active transaction with that timestamp. */ public long commit(long ts); } Index: TransactionServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TransactionServer.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** TransactionServer.java 19 Feb 2007 19:00:20 -0000 1.3 --- TransactionServer.java 20 Feb 2007 00:27:03 -0000 1.4 *************** *** 249,276 **** /** - * A blocking queue that imposes serializability on transactions. A writable - * transaction that attempts to {@link ITx#prepare()} is placed onto this - * queue. When its turn comes, it will validate its write set. - * - * FIXME this really belongs in the {@link TransactionServer} rather than - * the {@link Journal}. The {@link TransactionServer} is responsible for - * serializing transactions and coordinating 2-phase commits. It should - * accomplish this by placing transactions that issue COMMIT requests onto a - * queue that imposes serial execution of the commit protocol. The - * transaction on the head of the queue will first prepare and then commit - * as soon as it is prepared. If the transaction fails validation, then it - * must be aborted, but it could be retried by the client. The application - * should only request a COMMIT. The {@link TransactionServer} is - * responsible for issuing PREPARE requests to all resources on which writes - * have been made during the transaction and then issuing COMMIT requests to - * those resources once they have all suceessfully prepared. - * - * @todo a concurrent hash map for the preparing/committing transactions - * could be kept locally on the journal in order to detect violations - * of serializability by the {@link TransactionServer}. - */ - final BlockingQueue<ITx> commitQueue = new LinkedBlockingQueue<ITx>(); - - /** * Map containing metadata for active transactions. */ --- 249,252 ---- Index: Journal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Journal.java,v retrieving revision 1.53 retrieving revision 1.54 diff -C2 -d -r1.53 -r1.54 *** Journal.java 19 Feb 2007 19:00:20 -0000 1.53 --- Journal.java 20 Feb 2007 00:27:03 -0000 1.54 *************** *** 53,56 **** --- 53,60 ---- import java.util.Map; import java.util.Properties; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; import org.apache.log4j.Level; *************** *** 159,183 **** * </ol> * - * FIXME The notion of a committed state needs to be captured by a persistent - * structure in the journal until (a) there are no longer any active - * transactions that can read from that committed state; and (b) the slots - * allocated to that committed state have been released on the journal. Those - * commit states need to be locatable on the journal, suggesting a record - * written by PREPARE and finalized by COMMIT. - * - * @todo Work out protocol for shutdown with the single-threaded journal server. - * - * @todo Normal transaction operations need to be interleaved with operations to - * migrate committed data to the read-optimized database; with operations - * to logically delete data versions (and their slots) on the journal once - * those version are no longer readable by any active transaction; and - * with operations to compact the journal (extending can occur during - * normal transaction operations). One approach is to implement - * thread-checking using thread local variables or the ability to assign - * the journal to a thread and then hand off the journal to the thread for - * the activity that needs to be run, returning control to the normal - * transaction thread on (or shortly after) interrupt or when the - * operation is finished. - * * @todo There is a dependency in a distributed database architecture on * transaction begin time. A very long running transaction could force the --- 163,166 ---- *************** *** 206,211 **** * the same object back when you ask for an isolated named index). * - * FIXME Write test suites for the {@link TransactionServer}. - * * @todo I need to revisit the assumptions for very large objects in the face of * the recent / planned redesign. I expect that using an index with a key --- 189,192 ---- *************** *** 214,218 **** * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal { /** --- 195,199 ---- * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal, ITransactionManager { /** *************** *** 253,257 **** * The service used to generate commit timestamps. * ! * @todo paramterize using {@link Options} so that we can resolve a * low-latency service for use with a distributed database commit * protocol. --- 234,238 ---- * The service used to generate commit timestamps. * ! * @todo parameterize using {@link Options} so that we can resolve a * low-latency service for use with a distributed database commit * protocol. *************** *** 318,334 **** /** - * A hash map containing all active transactions. A transaction that is - * preparing will be in this collection until it has either successfully - * prepared or aborted. - */ - final Map<Long, ITx> activeTx = new HashMap<Long, ITx>(); - - /** - * A hash map containing all transactions that have prepared but not yet - * either committed or aborted. - */ - final Map<Long, ITx> preparedTx = new HashMap<Long, ITx>(); - - /** * Create or open a journal. * --- 299,302 ---- *************** *** 710,728 **** /** ! * Shutdown the journal politely. ! * ! * @exception IllegalStateException ! * if there are active transactions. ! * @exception IllegalStateException ! * if there are prepared transactions. ! * ! * @todo Workout protocol for shutdown of the journal, including forced ! * shutdown when there are active or prepar(ed|ing) transactions, ! * timeouts on transactions during shutdown, notification of abort for ! * transactions that do not complete in a timely manner, and ! * survivability of prepared transactions across restart. Reconcile ! * the semantics of this method with those declared by the raw store ! * interface, probably by declaring a variant that accepts parameters ! * specifying how to handle the shutdown (immediate vs wait). */ public void shutdown() { --- 678,684 ---- /** ! * Shutdown the journal politely. Active transactions and transactions ! * pending commit will run to completion, but no new transactions will be ! * accepted. */ public void shutdown() { *************** *** 730,753 **** assertOpen(); ! final int nactive = activeTx.size(); ! ! if (nactive > 0) { ! ! throw new IllegalStateException("There are " + nactive ! + " active transactions"); ! ! } ! ! final int nprepare = preparedTx.size(); ! ! if (nprepare > 0) { ! ! throw new IllegalStateException("There are " + nprepare ! + " prepared transactions."); ! ! } // close immediately. close(); } --- 686,705 ---- assertOpen(); ! final long begin = System.currentTimeMillis(); ! ! log.warn("#active="+activeTx.size()+", shutting down..."); ! ! /* ! * allow all pending tasks to complete, but no new tasks will be ! * accepted. ! */ ! commitService.shutdown(); // close immediately. close(); + + final long elapsed = System.currentTimeMillis() - begin; + + log.warn("Journal is shutdown: elapsed="+elapsed); } *************** *** 760,763 **** --- 712,718 ---- assertOpen(); + // force the commit thread to quit immediately. + commitService.shutdownNow(); + _bufferStrategy.close(); *************** *** 1362,1373 **** /* ! * transaction support. */ /** ! * Create a new fully-isolated read-write transaction. * ! * @see #newTx(boolean), to which this method delegates its implementation. */ public long newTx() { --- 1317,1354 ---- /* ! * ITransactionManager and friends. ! * ! * @todo refactor into a service. provide an implementation that supports ! * only a single Journal resource and an implementation that supports a ! * scale up/out architecture. the journal should resolve the service using ! * JINI. the timestamp service should probably be co-located with the ! * transaction service. */ /** ! * A hash map containing all active transactions. A transaction that is ! * preparing will be in this collection until it has either successfully ! * prepared or aborted. ! */ ! final Map<Long, ITx> activeTx = new HashMap<Long, ITx>(); ! ! /** ! * A hash map containing all transactions that have prepared but not yet ! * either committed or aborted. * ! * @todo this is probably useless. A transaction will be in this map only ! * while it is actively committing. ! */ ! final Map<Long, ITx> preparedTx = new HashMap<Long, ITx>(); ! ! /** ! * A thread that imposes serializability on transactions. A writable ! * transaction that attempts to {@link #commit()} is added as a ! * {@link CommitTask} and queued for execution by this thread. When its turn ! * comes, it will validate its write set and commit iff validation succeeds. */ + final ExecutorService commitService = Executors + .newSingleThreadExecutor(Executors.defaultThreadFactory()); + public long newTx() { *************** *** 1376,1393 **** } - /** - * Create a new fully-isolated transaction. - * - * @param readOnly - * When true, the transaction will reject writes. - * - * @todo This method supports transactions in a non-distributed database in - * which there is a centralized {@link Journal} that handles all - * concurrency control. There needs to be a {@link TransactionServer} - * that starts transactions. The {@link JournalServer} should summon a - * transaction object them into being on a {@link Journal} iff - * operations isolated by that transaction are required on that - * {@link Journal}. - */ public long newTx(boolean readOnly) { --- 1357,1360 ---- *************** *** 1397,1410 **** } - /** - * Create a new read-committed transaction. - * - * @return A transaction that will reject writes. Any committed data will be - * visible to indices isolated by this transaction. - * - * @todo implement read-committed transaction support. - * - * @see #newTx(boolean) - */ public long newReadCommittedTx() { --- 1364,1367 ---- *************** *** 1413,1432 **** } - /** - * Return the named index as isolated by the transaction. - * - * @param name - * The index name. - * @param ts - * The start time of the transaction. - * - * @return The isolated index. - * - * @exception IllegalArgumentException - * if <i>name</i> is <code>null</code> - * - * @exception IllegalStateException - * if there is no active transaction with that timestamp. - */ public IIndex getIndex(String name, long ts) { --- 1370,1373 ---- *************** *** 1452,1462 **** } - /** - * Commit the transaction on the journal. - * - * @param ts The transaction start time. - * - * @return The commit timestamp assigned to the transaction. - */ public long commit(long ts) { --- 1393,1396 ---- *************** *** 1465,1472 **** if (tx == null) throw new IllegalArgumentException("No such tx: " + ts); ! tx.prepare(); ! return tx.commit(); } --- 1399,1471 ---- if (tx == null) throw new IllegalArgumentException("No such tx: " + ts); + + if(tx.isReadOnly()) { ! tx.prepare(); ! return tx.commit(); ! ! } ! ! try { ! ! long commitTime = commitService.submit(new CommitTask(tx)).get(); ! ! if(DEBUG) { ! ! log.debug("committed: startTime="+tx.getStartTimestamp()+", commitTime="+commitTime); ! ! } ! ! return commitTime; ! ! } catch(InterruptedException ex) { ! ! // interrupted, perhaps during shutdown. ! throw new RuntimeException(ex); ! ! } catch(ExecutionException ex) { ! ! Throwable cause = ex.getCause(); ! ! if(cause instanceof ValidationError) { ! ! throw (ValidationError) cause; ! ! } ! ! // this is an unexpected error. ! throw new RuntimeException(cause); ! ! } ! ! } ! ! /** ! * Task validates and commits a transaction when it is run by the ! * {@link Journal#commitService}. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! private static class CommitTask implements Callable<Long> { ! ! private final ITx tx; ! ! public CommitTask(ITx tx) { ! ! assert tx != null; ! ! this.tx = tx; ! ! } ! ! public Long call() throws Exception { ! ! tx.prepare(); ! ! return tx.commit(); ! ! } } *************** *** 1480,1483 **** --- 1479,1485 ---- * * @throws IllegalStateException + * + * @todo test for transactions that have already been completed? that would + * represent a protocol error in the transaction manager service. */ protected void activateTx(ITx tx) throws IllegalStateException { |
From: Bryan T. <tho...@us...> - 2007-02-20 00:27:09
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv24314/src/test/com/bigdata/journal Modified Files: StressTestConcurrent.java TestJournalBasics.java TestConcurrentSchedules.java Log Message: Worked through basic serialization of commits. Index: TestJournalBasics.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** TestJournalBasics.java 19 Feb 2007 01:05:39 -0000 1.9 --- TestJournalBasics.java 20 Feb 2007 00:27:03 -0000 1.10 *************** *** 112,119 **** // @todo tests of read-committed transactions. suite.addTestSuite( TestReadCommittedTx.class ); - // @todo tests of concurrent schedules and conflict detection. - suite.addTestSuite( TestConcurrentSchedules.class ); // todo tests of write-write conflict resolution. suite.addTestSuite(TestConflictResolution.class); /* --- 112,119 ---- // @todo tests of read-committed transactions. suite.addTestSuite( TestReadCommittedTx.class ); // todo tests of write-write conflict resolution. suite.addTestSuite(TestConflictResolution.class); + // @todo tests of concurrent schedules and conflict detection. + suite.addTestSuite( TestConcurrentSchedules.class ); /* Index: StressTestConcurrent.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/StressTestConcurrent.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** StressTestConcurrent.java 19 Feb 2007 19:00:18 -0000 1.2 --- StressTestConcurrent.java 20 Feb 2007 00:27:03 -0000 1.3 *************** *** 48,51 **** --- 48,53 ---- package com.bigdata.journal; + import java.io.File; + import java.io.IOException; import java.util.Collection; import java.util.HashSet; *************** *** 87,90 **** --- 89,104 ---- Journal journal = new Journal(properties); + + if(journal.getBufferStrategy() instanceof MappedBufferStrategy) { + + /* + * @todo the mapped buffer strategy has become cpu bound w/o + * termination when used with concurrent clients - this needs to be + * looked into further. + */ + + fail("Mapped buffer strategy has problem with concurrency"); + + } final String name = "abc"; *************** *** 111,115 **** * shortcut the prepare/commit for those transactions as well. */ ! doConcurrentClientTest(journal,name,1,100); } finally { --- 125,129 ---- * shortcut the prepare/commit for those transactions as well. */ ! doConcurrentClientTest(journal,name,20,100); } finally { *************** *** 141,145 **** * index? */ ! public void doConcurrentClientTest(Journal journal, String name, int nclients, int ntx) throws InterruptedException { --- 155,160 ---- * index? */ ! static public void doConcurrentClientTest(Journal journal, String name, ! int nclients, int ntx) throws InterruptedException { *************** *** 159,181 **** /* ! * @todo this will fail since we are not serializing transactions in ! * prepare->commit. */ - List<Future<Long>> results = executorService.invokeAll(tasks, timeout, TimeUnit.SECONDS); ! /* ! * @todo validate results - all execute, valid commit times, no errors. ! * ! * @todo if write-write conflicts can result, then those should be ! * acceptable errors and the task could just be retried. ! */ Iterator<Future<Long>> itr = results.iterator(); while(itr.hasNext()) { Future<Long> future = itr.next(); ! assertFalse(future.isCancelled()); try { --- 174,203 ---- /* ! * Run the M transactions on N clients. */ ! final long begin = System.currentTimeMillis(); ! ! List<Future<Long>> results = executorService.invokeAll(tasks, timeout, TimeUnit.SECONDS); + final long elapsed = System.currentTimeMillis() - begin; + Iterator<Future<Long>> itr = results.iterator(); + int nfailed = 0; // #of transactions that failed validation. + int ncommitted = 0; // #of transactions that successfully committed. + int nuncommitted = 0; // #of transactions that did not complete in time. + while(itr.hasNext()) { Future<Long> future = itr.next(); ! if(future.isCancelled()) { ! ! nuncommitted++; ! ! continue; ! ! } try { *************** *** 183,191 **** assertNotSame(0L,future.get()); } catch(ExecutionException ex ) { ! // @todo validation errors should be allowed here. ! fail("Not expecting: "+ex, ex); } --- 205,223 ---- assertNotSame(0L,future.get()); + ncommitted++; + } catch(ExecutionException ex ) { ! // Validation errors are allowed and counted as aborted txs. ! if(ex.getCause() instanceof ValidationError) { ! ! nfailed++; ! ! } else { ! ! fail("Not expecting: "+ex, ex); ! ! } } *************** *** 193,196 **** --- 225,233 ---- } + System.err.println("#clients=" + nclients + ", ntx=" + ntx + + ", ncomitted=" + ncommitted + ", nfailed=" + nfailed + + ", nuncommitted=" + nuncommitted + " in " + elapsed + "ms (" + + ncommitted * 1000 / elapsed + "tps)"); + } *************** *** 250,253 **** --- 287,337 ---- } + + /** + * FIXME work through tps rates for each of the buffer modes + * + * FIXME make sure that the stress test terminates properly and that the + * journal shutsdown smoothly. + */ + public static void main(String[] args) throws IOException, InterruptedException { + + Properties properties = new Properties(); + + properties.setProperty(Options.BUFFER_MODE, BufferMode.Transient.toString()); + // properties.setProperty(Options.BUFFER_MODE, BufferMode.Direct.toString()); + // properties.setProperty(Options.BUFFER_MODE, BufferMode.Mapped.toString()); + // properties.setProperty(Options.BUFFER_MODE, BufferMode.Disk.toString()); + + properties.setProperty(Options.SEGMENT, "0"); + + File file = File.createTempFile("bigdata", ".jnl"); + + file.deleteOnExit(); + + if(!file.delete()) fail("Could not remove temp file before test"); + + properties.setProperty(Options.FILE, file.toString()); + + Journal journal = new Journal(properties); + + final String name = "abc"; + + { + + journal.registerIndex(name, new UnisolatedBTree(journal)); + + journal.commit(); + + } + + final int nclients = 20; + + final int ntx = 100; + + doConcurrentClientTest(journal, name, nclients, ntx); + + journal.shutdown(); + + } } Index: TestConcurrentSchedules.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestConcurrentSchedules.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestConcurrentSchedules.java 17 Feb 2007 21:34:12 -0000 1.1 --- TestConcurrentSchedules.java 20 Feb 2007 00:27:03 -0000 1.2 *************** *** 51,60 **** * Tests of concurrent transactions schedules designed to look for correct * execution of non-conflicting concurrent schedules and correct detection of ! * conflicts. This is the first test suite that considers concurrent schedules. * <p> * Note that only write-write conflicts may occur since the {@link Journal} uses * an MVCC style concurrency control algorithm. * ! * @todo refactor test and schedule support from dbcache. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 51,63 ---- * Tests of concurrent transactions schedules designed to look for correct * execution of non-conflicting concurrent schedules and correct detection of ! * conflicts. * <p> * Note that only write-write conflicts may occur since the {@link Journal} uses * an MVCC style concurrency control algorithm. * ! * @todo refactor test and schedule support from dbcache. there is perhaps no ! * need for the kinds of tests that I developed for dbcache since I had to ! * put into place infrastructure there designed to test locking mechanism ! * that is simply not needed here. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> |
From: Bryan T. <tho...@us...> - 2007-02-19 19:00:58
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv30034/src/java/com/bigdata/isolation Modified Files: IsolatedBTree.java Log Message: Some more work on transaction support, including an api change that makes ITx an internal api and presents just a timestamp to clients. This removes the potential for misuse of prepare/commit by the clients and sets us up for an ITransactionManager api. Index: IsolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/IsolatedBTree.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** IsolatedBTree.java 19 Feb 2007 01:05:39 -0000 1.5 --- IsolatedBTree.java 19 Feb 2007 19:00:20 -0000 1.6 *************** *** 115,118 **** --- 115,121 ---- * slick way to realize the proper semantics for this interface on a fused * view of the write set and the immutable index isolated by this class? + * + * FIXME I have not finished working through the fused view support for this + * class. */ public class IsolatedBTree extends UnisolatedBTree implements IIsolatableIndex, *************** *** 188,192 **** throw new IllegalArgumentException(); ! Value value = (Value) super.lookup(key); if (value == null) { --- 191,195 ---- throw new IllegalArgumentException(); ! Value value = super.getValue(key); if (value == null) { *************** *** 218,222 **** throw new IllegalArgumentException(); ! Value value = (Value) super.getValue((byte[])key); if (value == null) { --- 221,225 ---- throw new IllegalArgumentException(); ! Value value = super.getValue((byte[])key); if (value == null) { *************** *** 237,245 **** /** ! * Remove the key from the write set (does not write through to the isolated ! * index). */ public Object remove(Object key) { return super.remove(key); --- 240,283 ---- /** ! * Remove the key from the write set (does not write through to the ! * unisolated index). If the key is not in the write set, then we check the ! * unisolated index. If the key is found there, then we add a delete marker ! * to the isolated index. */ public Object remove(Object key) { + if (key == null) + throw new IllegalArgumentException(); + + // check the isolated index. + Value value = super.getValue((byte[])key); + + if (value == null) { + + /* + * The key does not exist in the isolated index, so now we test to + * see if the key exists in the unisolated index. if it does then we + * need to write a delete marker in the isolated index. + */ + value = src.getValue((byte[]) key); + + if(value==null||value.deleted) return null; + + super.remove(key); + + /* + * return the value from the unisolated index since that is what was + * deleted. + */ + return value.datum; + + } + + if (value.deleted) { + + return null; + + } + return super.remove(key); *************** *** 536,551 **** /* * IFF there was a pre-existing version in the global scope then ! * we clear the 'currentVersionSlots' in the entry in the global ! * scope and mark the index entry as dirty. The global scope ! * will now recognized the persistent identifier as 'deleted'. */ if (globalScope.contains(key)) { - /* - * Mark the entry in the unisolated index as deleted. - */ - // globalScope.insert(key, new Value( - // entry.nextVersionCounter(), true, null)); globalScope.remove(key); --- 574,583 ---- /* * IFF there was a pre-existing version in the global scope then ! * we remove the key from the global scope so that it will now ! * have a "delete marker" for this key. */ if (globalScope.contains(key)) { globalScope.remove(key); *************** *** 564,569 **** * Copy the entry down onto the global scope. */ ! // globalScope.insert(key, new Value(entry.nextVersionCounter(), ! // false, entry.datum)); globalScope.insert(key,entry.datum); --- 596,600 ---- * Copy the entry down onto the global scope. */ ! globalScope.insert(key,entry.datum); |
From: Bryan T. <tho...@us...> - 2007-02-19 19:00:58
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv30034/src/test/com/bigdata/journal Modified Files: TestTxRunState.java TestTransactionServer.java TestRootBlockView.java TestReadOnlyTx.java StressTestConcurrent.java TestConflictResolution.java TestTx.java Log Message: Some more work on transaction support, including an api change that makes ITx an internal api and presents just a timestamp to clients. This removes the potential for misuse of prepare/commit by the clients and sets us up for an ITransactionManager api. Index: TestConflictResolution.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestConflictResolution.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestConflictResolution.java 19 Feb 2007 01:05:39 -0000 1.2 --- TestConflictResolution.java 19 Feb 2007 19:00:18 -0000 1.3 *************** *** 56,59 **** --- 56,102 ---- /** * Tests of write-write conflict resolution. + * <p> + * Write-write conflicts either result in successful reconcilation via + * state-based conflict resolution or an abort of the transaction that is + * validating. The tests in this suite verify that write-write conflicts can be + * detected and provide versions of those tests where the conflict can and can + * not be validated and verify the end state in each case. + * <p> + * State-based validation requires transparency at the object level, including + * the ability to deserialize versions into objects, to compare objects for + * consistency, to merge data into the most recent version where possible and + * according to data type specific rules, and to destructively merge objects + * when the conflict arises on <em>identity</em> rather than state. + * <p> + * An example of an identity based conflict is when two objects are created that + * represent URIs in an RDF graph. Since the lexicon for an RDF graph generally + * requires uniqueness those objects must be merged into a single object since + * they have the same identity. For an RDFS store validation on the lexicon or + * statements ALWAYS succeeds since they are always consistent. + * + * @todo Verify that we can handle the bank account example (this is state-based + * conflict resolution altogether requiring that we carry a richer + * representation of state in the objects and then use that additional + * state to validate and resolve some kinds of data type specific + * conflicts). + * + * @todo Do tests that verify that multiple conflicts are correctly detected and + * resolved. + * + * @todo Verify that we can handle examples in which we have to traverse an + * object graph during conflict resolution. (Really, two object graphs: a + * readOnly view of the ground state for the transaction and the + * readWriteTx that we are trying to validate.) This last issue is by far + * the trickyest and may require support for concurrent modification of + * the transaction indices during traveral (or more simply of reading from + * a fused view of the resolved and unconflicting entries in the + * read-write tx index views). + * + * @todo Destructive merging of objects in a graph can propagate changes other + * objects. Unless the data structures provide for encapsulation, e.g., by + * defining objects that serve as collectors for the link set members in a + * given segment, that change could propagate beyond the segment in which + * it is detected. If changes can propagate in that manner then care MUST + * be taken to ensure that validation terminates. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 111,117 **** */ ! ITx tx1 = journal.newTx(); ! ITx tx2 = journal.newTx(); /* --- 154,160 ---- */ ! final long tx1 = journal.newTx(); ! final long tx2 = journal.newTx(); /* *************** *** 120,130 **** */ ! tx1.getIndex(name).insert(k1, v1a); ! tx2.getIndex(name).insert(k1, v1b); ! tx1.prepare(); ! ! tx1.commit(); /* --- 163,171 ---- */ ! journal.getIndex(name,tx1).insert(k1, v1a); ! journal.getIndex(name,tx2).insert(k1, v1b); ! journal.commit(tx1); /* *************** *** 134,143 **** assertEquals(v1a,(byte[])journal.getIndex(name).lookup(k1)); try { ! tx2.prepare(); fail("Expecting: "+ValidationError.class); } catch(ValidationError ex) { System.err.println("Ignoring expected exception: "+ex); ! assertTrue(tx2.isAborted()); } --- 175,186 ---- assertEquals(v1a,(byte[])journal.getIndex(name).lookup(k1)); + final ITx tmp = journal.getTx(tx2); + try { ! journal.commit(tx2); fail("Expecting: "+ValidationError.class); } catch(ValidationError ex) { System.err.println("Ignoring expected exception: "+ex); ! assertTrue(tmp.isAborted()); } *************** *** 188,194 **** */ ! ITx tx1 = journal.newTx(); ! ITx tx2 = journal.newTx(); /* --- 231,237 ---- */ ! final long tx1 = journal.newTx(); ! final long tx2 = journal.newTx(); /* *************** *** 197,207 **** */ ! tx1.getIndex(name).insert(k1, v1a); ! tx2.getIndex(name).insert(k1, v1b); ! tx1.prepare(); ! ! tx1.commit(); /* --- 240,248 ---- */ ! journal.getIndex(name,tx1).insert(k1, v1a); ! journal.getIndex(name,tx2).insert(k1, v1b); ! journal.commit(tx1); /* *************** *** 211,220 **** assertEquals(v1a,(byte[])journal.getIndex(name).lookup(k1)); ! tx2.prepare(); ! ! // @todo the indices should probably become read only at this point. ! assertEquals(v1c,(byte[])tx2.getIndex(name).lookup(k1)); ! ! tx2.commit(); /* --- 252,256 ---- assertEquals(v1a,(byte[])journal.getIndex(name).lookup(k1)); ! journal.commit(tx2); /* *************** *** 228,293 **** } ! /** ! * The concurrency control algorithm must not permit two transactions to ! * prepare at the same time since that violates the basic rules of ! * serializability. ! * ! * @todo javadoc and move into schedules test suite or its own test suite. ! */ ! public void test_serializability() { ! ! Properties properties = getProperties(); ! ! Journal journal = new Journal(properties); ! ! String name = "abc"; ! ! final byte[] k1 = new byte[] { 1 }; ! ! final byte[] v1a = new byte[] { 1 }; ! final byte[] v1b = new byte[] { 2 }; ! ! { ! ! /* ! * register an index and commit the journal. ! */ ! ! journal.registerIndex(name, new UnisolatedBTree(journal)); ! ! journal.commit(); ! ! } ! ! /* ! * Create two transactions. ! */ ! ! ITx tx1 = journal.newTx(); ! ! ITx tx2 = journal.newTx(); ! ! /* ! * Write a value under the same key on the same index in both ! * transactions. ! */ ! ! tx1.getIndex(name).insert(k1, v1a); ! ! tx2.getIndex(name).insert(k1, v1b); ! ! tx1.prepare(); ! ! try { ! tx2.prepare(); ! fail("Expecting: "+IllegalStateException.class); ! } catch(IllegalStateException ex) { ! System.err.println("Ignoring expected exception: "+ex); ! } ! ! journal.close(); ! ! } ! /** * Helper class used to resolve a predicted conflict to a known value. --- 264,329 ---- } ! // /** ! // * The concurrency control algorithm must not permit two transactions to ! // * prepare at the same time since that violates the basic rules of ! // * serializability. ! // * ! // * @todo javadoc and move into schedules test suite or its own test suite. ! // */ ! // public void test_serializability() { ! // ! // Properties properties = getProperties(); ! // ! // Journal journal = new Journal(properties); ! // ! // String name = "abc"; ! // ! // final byte[] k1 = new byte[] { 1 }; ! // ! // final byte[] v1a = new byte[] { 1 }; ! // final byte[] v1b = new byte[] { 2 }; ! // ! // { ! // ! // /* ! // * register an index and commit the journal. ! // */ ! // ! // journal.registerIndex(name, new UnisolatedBTree(journal)); ! // ! // journal.commit(); ! // ! // } ! // ! // /* ! // * Create two transactions. ! // */ ! // ! // final long tx1 = journal.newTx(); ! // ! // final long tx2 = journal.newTx(); ! // ! // /* ! // * Write a value under the same key on the same index in both ! // * transactions. ! // */ ! // ! // journal.getIndex(name,tx1).insert(k1, v1a); ! // ! // journal.getIndex(name,tx2).insert(k1, v1b); ! // ! // tx1.prepare(); ! // ! // try { ! // tx2.prepare(); ! // fail("Expecting: "+IllegalStateException.class); ! // } catch(IllegalStateException ex) { ! // System.err.println("Ignoring expected exception: "+ex); ! // } ! // ! // journal.close(); ! // ! // } ! // /** * Helper class used to resolve a predicted conflict to a known value. Index: TestTxRunState.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestTxRunState.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestTxRunState.java 17 Feb 2007 21:34:12 -0000 1.2 --- TestTxRunState.java 19 Feb 2007 19:00:18 -0000 1.3 *************** *** 51,58 **** --- 51,65 ---- import java.util.Properties; + import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.objndx.IIndex; + /** * Test suite for the state machine governing the transaction {@link RunState} * transitions. * + * @todo refactor to test both {@link Tx} and the as yet to be written + * read-committed transaction class (ideally they will share a base class + * which encapsulates the state transaction mechanisms). + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ *************** *** 439,441 **** --- 446,706 ---- } + /** + * Verifies that access to, and operations on, a named indices is denied + * after a PREPARE. + * + * @throws IOException + */ + public void test_runStateMachine_prepared_correctRejection() + throws IOException { + + final Properties properties = getProperties(); + + Journal journal = new Journal(properties); + + String name = "abc"; + + { + + journal.registerIndex(name, new UnisolatedBTree(journal)); + + journal.commit(); + + } + + final long tx0 = journal.newTx(); + + ITx tmp = journal.getTx(tx0); + + assertNotNull(tmp); + + IIndex ndx = journal.getIndex(name,tx0); + + assertNotNull(ndx); + + // commit the journal. + journal.commit(tx0); + + /* + * Verify that you can not access a named index after 'prepare'. + */ + try { + journal.getIndex(name,tx0); + fail("Expecting: " + IllegalStateException.class); + } catch (IllegalStateException ex) { + System.err.println("Ignoring expected exception: " + ex); + } + + /* + * Verify that operations on an pre-existing index reference are now + * denied. + */ + try { + ndx.lookup(new byte[] { 1 }); + fail("Expecting: " + IllegalStateException.class); + } catch (IllegalStateException ex) { + System.err.println("Ignoring expected exception: " + ex); + } + try { + ndx.contains(new byte[] { 1 }); + fail("Expecting: " + IllegalStateException.class); + } catch (IllegalStateException ex) { + System.err.println("Ignoring expected exception: " + ex); + } + try { + ndx.remove(new byte[] { 1 }); + fail("Expecting: " + IllegalStateException.class); + } catch (IllegalStateException ex) { + System.err.println("Ignoring expected exception: " + ex); + } + try { + ndx.insert(new byte[] { 1 }, new byte[] { 2 }); + fail("Expecting: " + IllegalStateException.class); + } catch (IllegalStateException ex) { + System.err.println("Ignoring expected exception: " + ex); + } + + assertFalse(tmp.isActive()); + assertTrue(tmp.isPrepared()); + assertFalse(tmp.isAborted()); + assertFalse(tmp.isCommitted()); + assertFalse(tmp.isComplete()); + + assertFalse(journal.activeTx.containsKey(tmp.getStartTimestamp())); + assertFalse(journal.preparedTx.containsKey(tmp.getStartTimestamp())); + assertNull(journal.getTx(tmp.getStartTimestamp())); + + journal.close(); + + } + + // /** + // * Verifies that access to, and operations on, a named indices is denied + // * after an ABORT. + // * + // * @throws IOException + // */ + // public void test_runStateMachine_aborted_correctRejection() + // throws IOException { + // + // final Properties properties = getProperties(); + // + // Journal journal = new Journal(properties); + // + // String name = "abc"; + // + // { + // + // journal.registerIndex(name, new UnisolatedBTree(journal)); + // + // journal.commit(); + // + // } + // + // ITx tx0 = journal.newTx(); + // + // IIndex ndx = tx0.getIndex(name); + // + // assertNotNull(ndx); + // + // tx0.abort(); + // + // /* + // * Verify that you can not access a named index. + // */ + // try { + // tx0.getIndex(name); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // + // /* + // * Verify that operations on an pre-existing index reference are now + // * denied. + // */ + // try { + // ndx.lookup(new byte[] { 1 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // try { + // ndx.contains(new byte[] { 1 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // try { + // ndx.remove(new byte[] { 1 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // try { + // ndx.insert(new byte[] { 1 }, new byte[] { 2 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // + // assertFalse(tx0.isActive()); + // assertFalse(tx0.isPrepared()); + // assertTrue (tx0.isAborted()); + // assertFalse(tx0.isCommitted()); + // assertTrue (tx0.isComplete()); + // + // assertFalse(journal.activeTx.containsKey(tx0.getStartTimestamp())); + // assertFalse(journal.preparedTx.containsKey(tx0.getStartTimestamp())); + // assertNull(journal.getTx(tx0.getStartTimestamp())); + // + // journal.close(); + // + // } + // + // /** + // * Verifies that access to, and operations on, a named indices is denied + // * after a COMMIT. + // * + // * @throws IOException + // */ + // public void test_runStateMachine_commit_correctRejection() + // throws IOException { + // + // final Properties properties = getProperties(); + // + // Journal journal = new Journal(properties); + // + // String name = "abc"; + // + // { + // + // journal.registerIndex(name, new UnisolatedBTree(journal)); + // + // journal.commit(); + // + // } + // + // ITx tx0 = journal.newTx(); + // + // IIndex ndx = tx0.getIndex(name); + // + // assertNotNull(ndx); + // + // tx0.prepare(); + // tx0.commit(); + // + // /* + // * Verify that you can not access a named index. + // */ + // try { + // tx0.getIndex(name); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // + // /* + // * Verify that operations on an pre-existing index reference are now + // * denied. + // */ + // try { + // ndx.lookup(new byte[] { 1 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // try { + // ndx.contains(new byte[] { 1 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // try { + // ndx.remove(new byte[] { 1 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // try { + // ndx.insert(new byte[] { 1 }, new byte[] { 2 }); + // fail("Expecting: " + IllegalStateException.class); + // } catch (IllegalStateException ex) { + // System.err.println("Ignoring expected exception: " + ex); + // } + // + // assertFalse(tx0.isActive()); + // assertFalse(tx0.isPrepared()); + // assertFalse(tx0.isAborted()); + // assertTrue(tx0.isCommitted()); + // assertTrue(tx0.isComplete()); + // + // assertFalse(journal.activeTx.containsKey(tx0.getStartTimestamp())); + // assertFalse(journal.preparedTx.containsKey(tx0.getStartTimestamp())); + // assertNull(journal.getTx(tx0.getStartTimestamp())); + // + // journal.close(); + // + // } + } Index: TestTx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestTx.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** TestTx.java 17 Feb 2007 21:34:12 -0000 1.16 --- TestTx.java 19 Feb 2007 19:00:18 -0000 1.17 *************** *** 57,79 **** * Test suite for fully-isolated read-write transactions. * ! * @todo Test suite for transaction isolation with respect to the underlying ! * journal. The tests in this suite are designed to verify isolation of ! * changes within the scope of the transaction when compared to the last ! * committed state of the journal. This basically amounts to verifying ! * that operations read through the transaction scope object index into ! * the journal scope object index. ! * ! * @todo Do stress test with writes, reads, and deletes. [...1731 lines suppressed...] - // throws RuntimeException { - // - // if( randomData != null ) { - // - // fail("Already invoked once."); - // - // } - // - // System.err.println("Random resolution of conflict."); - // - // randomData = getRandomData(journal); - // - // tx.write(id, randomData); - // - // } - // - // } - } --- 1438,1440 ---- Index: StressTestConcurrent.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/StressTestConcurrent.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** StressTestConcurrent.java 19 Feb 2007 01:05:39 -0000 1.1 --- StressTestConcurrent.java 19 Feb 2007 19:00:18 -0000 1.2 *************** *** 48,51 **** --- 48,67 ---- package com.bigdata.journal; + import java.util.Collection; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Properties; + import java.util.Random; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; + + import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.objndx.IIndex; + /** * Stress tests for concurrent transaction processing. *************** *** 63,71 **** } ! public void test_concurrentClients() { ! fail("write tests"); } } --- 79,253 ---- } ! /** ! * A stress test with a small pool of concurrent clients. ! */ ! public void test_concurrentClients() throws InterruptedException { ! ! Properties properties = getProperties(); ! Journal journal = new Journal(properties); ! ! final String name = "abc"; ! ! { ! journal.registerIndex(name, new UnisolatedBTree(journal)); ! ! journal.commit(); ! } ! ! try { ! ! /* ! * FIXME This is Ok with one concurrent client, but it will fail ! * with more than one concurrent client. The underlying problem is ! * that we are not serializing transactions. Once a transaction ! * begins to prepare, concurrent transactions that seek to prepare ! * must block until the transaction either aborts or commits. This ! * can be implemented by a queue of committers. Note that concurrent ! * transactions may always abort while they are running. Also note ! * that a transaction that is read only does not need to synchronize ! * since it will have an empty write set. We could also detect ! * transactions with empty write sets (no touched indices) and ! * shortcut the prepare/commit for those transactions as well. ! */ ! doConcurrentClientTest(journal,name,1,100); ! ! } finally { ! ! journal.close(); ! ! } ! ! } ! ! /** ! * A stress test with a pool of concurrent clients. ! * ! * @param journal ! * The database. ! * ! * @param name ! * The name of the index on which the transactions will ! * operation. ! * ! * @param nclients ! * The #of concurrent clients. ! * ! * @param ntx ! * The #of transactions to execute. ! * ! * @todo can this also be a correctness test if we choose the ! * read/write/delete operations carefully and maintain a ground truth ! * index? ! */ ! public void doConcurrentClientTest(Journal journal, String name, int nclients, int ntx) ! throws InterruptedException ! { ! ! ExecutorService executorService = Executors.newFixedThreadPool( ! nclients, Executors.defaultThreadFactory()); ! ! final long timeout = 5; ! ! Collection<Callable<Long>> tasks = new HashSet<Callable<Long>>(); ! ! for(int i=0; i<ntx; i++) { ! ! tasks.add(new Task(journal, journal.newTx(), name)); ! ! } ! ! /* ! * @todo this will fail since we are not serializing transactions in ! * prepare->commit. ! */ ! List<Future<Long>> results = executorService.invokeAll(tasks, timeout, TimeUnit.SECONDS); ! ! /* ! * @todo validate results - all execute, valid commit times, no errors. ! * ! * @todo if write-write conflicts can result, then those should be ! * acceptable errors and the task could just be retried. ! */ ! ! Iterator<Future<Long>> itr = results.iterator(); ! ! while(itr.hasNext()) { ! ! Future<Long> future = itr.next(); ! ! assertFalse(future.isCancelled()); ! ! try { ! ! assertNotSame(0L,future.get()); ! ! } catch(ExecutionException ex ) { ! ! // @todo validation errors should be allowed here. ! ! fail("Not expecting: "+ex, ex); ! ! } ! ! } } + + // @todo change to IJournal + public static class Task implements Callable<Long> { + + private final Journal journal; + private final long tx; + private final IIndex ndx; + final Random r = new Random(); + + public Task(Journal journal,long tx, String name) { + this.journal = journal; + + this.tx = tx; + + this.ndx = journal.getIndex(name,tx); + + } + + /** + * Executes random operations in the transaction. + * + * @return The commit time of the transaction. + */ + public Long call() throws Exception { + + // Random operations on the named index(s). + + for (int i = 0; i < 100; i++) { + + byte[] key = new byte[4]; + + r.nextBytes(key); + + if (r.nextInt(100) > 10) { + + byte[] val = new byte[5]; + + r.nextBytes(val); + + ndx.insert(key, val); + + } else { + + ndx.remove(key); + + } + + } + + // commit. + return journal.commit(tx); + + } + + } + } Index: TestTransactionServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestTransactionServer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestTransactionServer.java 8 Nov 2006 15:18:12 -0000 1.1 --- TestTransactionServer.java 19 Feb 2007 19:00:18 -0000 1.2 *************** *** 48,51 **** --- 48,53 ---- package com.bigdata.journal; + import com.bigdata.journal.TransactionServer.IsolationEnum; + import junit.framework.TestCase; *************** *** 84,89 **** } - final int segment1 = 1; - /** * Test verifies some correctness for determination of transaction ground --- 86,89 ---- *************** *** 106,118 **** * @todo This is bootstrapping a ground state, which is a bit kludgy. */ ! final long t0 = server.startTx(segment1, false, false); server.commitTx(t0); // @todo verify groundState for new transactions is t0. ! final long t1 = server.startTx(segment1, false, false); ! final long t2 = server.startTx(segment1, false, false); server.commitTx(t1); // @todo verify groundState for new transactions is t1. ! final long t3 = server.startTx(segment1, false, false); server.commitTx(t2); /* --- 106,118 ---- * @todo This is bootstrapping a ground state, which is a bit kludgy. */ ! final long t0 = server.startTx(IsolationEnum.ReadWrite); server.commitTx(t0); // @todo verify groundState for new transactions is t0. ! final long t1 = server.startTx(IsolationEnum.ReadWrite); ! final long t2 = server.startTx(IsolationEnum.ReadWrite); server.commitTx(t1); // @todo verify groundState for new transactions is t1. ! final long t3 = server.startTx(IsolationEnum.ReadWrite); server.commitTx(t2); /* *************** *** 122,126 **** * no longer an active groundState. */ ! final long t4 = server.startTx(segment1, false, false); server.commitTx(t3); /* --- 122,126 ---- * no longer an active groundState. */ ! final long t4 = server.startTx(IsolationEnum.ReadWrite); server.commitTx(t3); /* Index: TestRootBlockView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestRootBlockView.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** TestRootBlockView.java 17 Feb 2007 21:34:12 -0000 1.8 --- TestRootBlockView.java 19 Feb 2007 19:00:18 -0000 1.9 *************** *** 110,115 **** assertEquals("segmentId", segmentId, rootBlock.getSegmentId()); assertEquals("nextOffset", nextOffset, rootBlock.getNextOffset()); ! assertEquals("firstTxId", firstTxId, rootBlock.getFirstTxId()); ! assertEquals("lastTxId", lastTxId, rootBlock.getLastTxId()); assertEquals("commitCounter", commitCounter, rootBlock.getCommitCounter()); assertEquals("commitTimestamp", commitTimestamp, rootBlock.getCommitTimestamp()); --- 110,115 ---- assertEquals("segmentId", segmentId, rootBlock.getSegmentId()); assertEquals("nextOffset", nextOffset, rootBlock.getNextOffset()); ! assertEquals("firstTxId", firstTxId, rootBlock.getFirstTxCommitTime()); ! assertEquals("lastTxId", lastTxId, rootBlock.getLastTxCommitTime()); assertEquals("commitCounter", commitCounter, rootBlock.getCommitCounter()); assertEquals("commitTimestamp", commitTimestamp, rootBlock.getCommitTimestamp()); *************** *** 125,130 **** assertEquals("segmentId", segmentId, rootBlock.getSegmentId()); assertEquals("nextOffset", nextOffset, rootBlock.getNextOffset()); ! assertEquals("firstTxId", firstTxId, rootBlock.getFirstTxId()); ! assertEquals("lastTxId", lastTxId, rootBlock.getLastTxId()); assertEquals("commitCounter", commitCounter, rootBlock.getCommitCounter()); assertEquals("commitTimestamp", commitTimestamp, rootBlock.getCommitTimestamp()); --- 125,130 ---- assertEquals("segmentId", segmentId, rootBlock.getSegmentId()); assertEquals("nextOffset", nextOffset, rootBlock.getNextOffset()); ! assertEquals("firstTxId", firstTxId, rootBlock.getFirstTxCommitTime()); ! assertEquals("lastTxId", lastTxId, rootBlock.getLastTxCommitTime()); assertEquals("commitCounter", commitCounter, rootBlock.getCommitCounter()); assertEquals("commitTimestamp", commitTimestamp, rootBlock.getCommitTimestamp()); Index: TestReadOnlyTx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestReadOnlyTx.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TestReadOnlyTx.java 19 Feb 2007 01:05:39 -0000 1.2 --- TestReadOnlyTx.java 19 Feb 2007 19:00:18 -0000 1.3 *************** *** 111,117 **** */ ! ITx tx1 = journal.newTx(true); ! IIndex ndx = tx1.getIndex(name); assertNotNull(ndx); --- 111,117 ---- */ ! final long tx1 = journal.newTx(true); ! IIndex ndx = journal.getIndex(name,tx1); assertNotNull(ndx); *************** *** 126,132 **** } ! tx1.prepare(); ! ! tx1.commit(); } --- 126,130 ---- } ! journal.commit(tx1); } *************** *** 139,145 **** */ ! ITx tx1 = journal.newTx(true); ! IIndex ndx = tx1.getIndex(name); assertNotNull(ndx); --- 137,143 ---- */ ! final long tx1 = journal.newTx(true); ! IIndex ndx = journal.getIndex(name,tx1); assertNotNull(ndx); *************** *** 154,158 **** } ! tx1.abort(); } --- 152,156 ---- } ! journal.abort(tx1); } |
From: Bryan T. <tho...@us...> - 2007-02-19 19:00:32
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/isolation In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv30034/src/test/com/bigdata/isolation Modified Files: TestIsolatedBTree.java Log Message: Some more work on transaction support, including an api change that makes ITx an internal api and presents just a timestamp to clients. This removes the potential for misuse of prepare/commit by the clients and sets us up for an ITransactionManager api. Index: TestIsolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/isolation/TestIsolatedBTree.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestIsolatedBTree.java 15 Feb 2007 01:34:23 -0000 1.1 --- TestIsolatedBTree.java 19 Feb 2007 19:00:23 -0000 1.2 *************** *** 48,51 **** --- 48,52 ---- package com.bigdata.isolation; + import com.bigdata.journal.TestTx; import com.bigdata.objndx.AbstractBTreeTestCase; import com.bigdata.objndx.BTreeMetadata; *************** *** 65,69 **** * @version $Id$ * ! * @todo review tests of the point apis. * * @todo test the batch apis. all methods must work with {@link Value}s (the --- 66,73 ---- * @version $Id$ * ! * @todo review tests of the point apis, especially with respect to ! * {@link TestTx}. all of the nitty gritty should be first tested in this ! * suite, including the specific values of the version counters and the ! * presence / absence of deletion markers, since those data are protected. * * @todo test the batch apis. all methods must work with {@link Value}s (the *************** *** 80,84 **** * @todo test entryIterator() - it visits only those that are not deleted. * ! * @todo write tests of validate() and mergeDown() */ public class TestIsolatedBTree extends AbstractBTreeTestCase { --- 84,90 ---- * @todo test entryIterator() - it visits only those that are not deleted. * ! * @todo write tests of validate() and mergeDown(). note that these are also ! * tested by {@link TestTx} and friends. However, we can verify the ! * version counters and delete flags in this package. */ public class TestIsolatedBTree extends AbstractBTreeTestCase { |
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv30034/src/java/com/bigdata/journal Modified Files: Tx.java IRootBlockView.java RootBlockView.java TransactionServer.java Journal.java TemporaryStore.java Log Message: Some more work on transaction support, including an api change that makes ITx an internal api and presents just a timestamp to clients. This removes the potential for misuse of prepare/commit by the clients and sets us up for an ITransactionManager api. Index: TransactionServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TransactionServer.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** TransactionServer.java 17 Feb 2007 21:34:17 -0000 1.2 --- TransactionServer.java 19 Feb 2007 19:00:20 -0000 1.3 *************** *** 50,67 **** import java.util.BitSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; ! ! import com.bigdata.util.TimestampFactory; /** ! * The transaction server is responsible for generating unique transaction ! * identifiers, determining when a transaction is "dead" (through inactivity), ! * and notifying journals when they may GC completed transactions. The ! * transaction server may either be integrated into an embedded database or run ! * as a replicated service for a distributed database. The transaction ! * identifier is simply a unique timestamp with nanosecond precision. The ! * transaction identifiers are generated by a centralized service. For an ! * embedded database, that service is local. For a distributed database the ! * service must be resolved, e.g., using JINI. * * @todo Define an API for the transaction server. Implement an embedded version --- 50,68 ---- import java.util.BitSet; import java.util.Map; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; ! import java.util.concurrent.LinkedBlockingQueue; /** ! * The transaction server is responsible for starting, preparing, and committing ! * transactions, for assignig unique start and commit timestamps to ! * transactions, for determining when a transaction is "dead" (through ! * inactivity), and for coordinating a restart-safe process for releasing ! * resources no longer accessible to any active transaction. The transaction ! * server may either be integrated into an embedded database or run as a ! * replicated service for a distributed database. The transaction start time ! * serves a unique transaction identifier. Unique timestamps are generated by a ! * centralized service with nanosecond precision, which is typically co-located ! * with the {@link TransactionServer}. * * @todo Define an API for the transaction server. Implement an embedded version *************** *** 73,95 **** * curves in terms of latency and maximum concurrency. * ! * @todo The distributed database version of the transaction server needs to ! * know where to find each segment. In the embedded distributed scenario, ! * the segments are all local (within the VM). In the non-embedded ! * scenario, the segments need to be discovered using the same protocol ! * that the clients use to discover segments. ! * ! * @todo A centralized transaction service will create an eventual concurrency ! * bottleneck. There must be 2-3 RPCs to the transaction server per ! * transaction (start -> prepare | abort -> commit), plus a probable ! * heartbeat. The #of concurrent client connections to the server will be ! * a limiting factor unless we use UDP (vs TCP) for the transaction server ! * RPC protocol. The bottleneck will probably be upwards of 1000s of ! * concurrent transactions. I would expect that 10k concurrent ! * transactions could be the limit, and should certainly be an internal ! * target. Achieving higher concurrency will require more localized ! * mechanisms, e.g., single row atomic updates. There is also a use case ! * for read committed transactions in order to permit earlier GC with ! * very very long running transactions, which would otherwise defer GC ! * until their completion. * * @todo Concurrent transactions define a dependency graph of (a) transactions --- 74,93 ---- * curves in terms of latency and maximum concurrency. * ! * @todo A centralized transaction service will create a concurrency bottleneck. ! * There must be 2-3 RPCs to the transaction server per transaction (start -> ! * prepare | abort -> commit), plus a heartbeat. Some RPC can be bundle a ! * heartbeat (or visa versa) to minimize traffic. The #of concurrent ! * client connections to the server will be a limiting factor unless we ! * use UDP (vs TCP) for the transaction server RPC protocol. The ! * bottleneck will probably be upwards of 1000s of concurrent ! * transactions. One limiting factor will be the latency of the commit. ! * This can be reduced by replicating the journal onto multiple hosts so ! * that we do not need to synchronize to disk during a commit. I would ! * expect that 10k concurrent transactions could be the limit, and should ! * certainly be an internal target. Achieving higher concurrency will ! * require more localized mechanisms, e.g., single row atomic updates. ! * There is also a use case for read committed transactions in order to ! * permit earlier GC with very very long running transactions, which would ! * otherwise defer GC until their completion. * * @todo Concurrent transactions define a dependency graph of (a) transactions *************** *** 118,122 **** /** ! * Class modeling transaction metadata. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 116,161 ---- /** ! * The service used to generate commit timestamps. ! * ! * @todo paramterize using {@link Options} so that we can resolve a ! * low-latency service for use with a distributed database commit ! * protocol. ! */ ! protected final ITimestampService timestampFactory = LocalTimestampService.INSTANCE; ! ! /** ! * Isolation levels for a transaction. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! public static enum IsolationEnum { ! ! /** ! * A fully isolated read-write transaction. ! */ ! ReadWrite(), ! ! /** ! * A fully isolated read-only transaction. ! */ ! ReadOnly(), ! ! /** ! * A read-only transaction that will read any data successfully ! * committed on the database (the view provided by the transaction does ! * not remain valid as of the transaction start time but evolves as ! * concurrent transactions commit). ! */ ! ReadCommitted(); ! ! private IsolationEnum() {} ! ! } ! ! /** ! * Class modeling transaction metadata. An instance of this class is used to ! * model a transaction in the {@link TransactionServer}. {@link ITx} ! * objects are used to model the transaction local to a {@link Journal}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 185,192 **** Map<Long,TxMetadata> committed = new ConcurrentHashMap<Long, TxMetadata>(); ! public TxMetadata(long ts,boolean readOnly,boolean readCommitted) { this.ts = ts; ! this.readOnly = readOnly; ! this.readCommitted = readCommitted; // runState = RunState.ACTIVE; --- 224,235 ---- Map<Long,TxMetadata> committed = new ConcurrentHashMap<Long, TxMetadata>(); ! public TxMetadata(long ts,IsolationEnum isolationLevel) { ! this.ts = ts; ! ! this.readOnly = isolationLevel == IsolationEnum.ReadOnly ! || isolationLevel == IsolationEnum.ReadCommitted; ! ! this.readCommitted = isolationLevel == IsolationEnum.ReadCommitted; // runState = RunState.ACTIVE; *************** *** 206,209 **** --- 249,276 ---- /** + * A blocking queue that imposes serializability on transactions. A writable + * transaction that attempts to {@link ITx#prepare()} is placed onto this + * queue. When its turn comes, it will validate its write set. + * + * FIXME this really belongs in the {@link TransactionServer} rather than + * the {@link Journal}. The {@link TransactionServer} is responsible for + * serializing transactions and coordinating 2-phase commits. It should + * accomplish this by placing transactions that issue COMMIT requests onto a + * queue that imposes serial execution of the commit protocol. The + * transaction on the head of the queue will first prepare and then commit + * as soon as it is prepared. If the transaction fails validation, then it + * must be aborted, but it could be retried by the client. The application + * should only request a COMMIT. The {@link TransactionServer} is + * responsible for issuing PREPARE requests to all resources on which writes + * have been made during the transaction and then issuing COMMIT requests to + * those resources once they have all suceessfully prepared. + * + * @todo a concurrent hash map for the preparing/committing transactions + * could be kept locally on the journal in order to detect violations + * of serializability by the {@link TransactionServer}. + */ + final BlockingQueue<ITx> commitQueue = new LinkedBlockingQueue<ITx>(); + + /** * Map containing metadata for active transactions. */ *************** *** 232,241 **** /** ! * A reference to the transaction metadata for the transaction that will ! * serve as the ground state for any new transactions. The groundState ! * transaction is updated each time a transaction commits. This field is ! * initially null. ! * ! * @todo Perhaps this should be a timestamp rather than a reference? * * @todo Provide for bootstrapping this field. For a new database, the field --- 299,305 ---- /** ! * The timestamp that will serve as the ground state for any new ! * transactions. The groundState is updated each time a transaction commits ! * to the commit time of that transaction. This field is initially 0L. * * @todo Provide for bootstrapping this field. For a new database, the field *************** *** 246,250 **** * of new transactions. */ ! private TxMetadata groundState = null; /** --- 310,314 ---- * of new transactions. */ ! private long groundState = 0L; /** *************** *** 257,285 **** * Start a new transaction. * ! * @param segment ! * The first segment that will be opened by the transaction or -1 ! * if the client elects not to notify the transaction server of ! * the first segment at the time that the transaction starts. ! * ! * @param readOnly ! * true iff the transaction is read-only. ! * ! * @param readCommitted ! * true iff the transaction is not fully isolated (reads of ! * updates committed by concurrent transactions are permitted). * ! * @return The unique transaction identifier assigned to the new ! * transaction. * ! * @todo This API suggests that -1 is not a valid segment identifier. We ! * could have made 0 the invalid segment identifier just as easily. In ! * practice, I expect that segment identifiers will be positive ! * integers. */ ! public long startTx(int segment, boolean readOnly, boolean readCommitted) { ! long ts = TimestampFactory.nextNanoTime(); ! transactions.put(ts, new TxMetadata(ts,readOnly,readCommitted)); return ts; --- 321,339 ---- * Start a new transaction. * ! * @param isolationLevel ! * The isolation level for the transaction. * ! * @return The unique transaction start time assigned to the transaction. * ! * @todo support clients that eagerly declare their intention in terms of ! * {indexName,separatorKey} tuples. */ ! public long startTx(IsolationEnum isolationLevel) { ! ! assert isolationLevel != null; ! long ts = timestampFactory.nextTimestamp(); ! transactions.put(ts, new TxMetadata(ts,isolationLevel)); return ts; *************** *** 338,344 **** * Commit a transaction. When the transaction is distributed, this prepares * the transaction and then commits the transaction according to a ! * multi-phase commit protocol. Notice will be issued to all segments that ! * have been written or read by the transaction. The transaction identifier ! * will be invalidated. * * @param ts --- 392,398 ---- * Commit a transaction. When the transaction is distributed, this prepares * the transaction and then commits the transaction according to a ! * multi-phase commit protocol. Notice will be issued to all resources on ! * which the transaction has written. The transaction identifier will be ! * invalidated. * * @param ts *************** *** 513,516 **** --- 567,591 ---- } + // /** + // * A description of an index partition used to declare the index partition + // * as part of the transaction. + // * + // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + // * @version $Id$ + // */ + // public static class PartitionMetadata { + // + // /** + // * The index name. + // */ + // public static final String name; + // + // /** + // * The separator key for the index partition. + // */ + // public static final byte[] separatorKey; + // + // } + } Index: Journal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Journal.java,v retrieving revision 1.52 retrieving revision 1.53 diff -C2 -d -r1.52 -r1.53 *** Journal.java 19 Feb 2007 01:05:39 -0000 1.52 --- Journal.java 19 Feb 2007 19:00:20 -0000 1.53 *************** *** 1,45 **** /** ! The Notice below must appear in each file of the Source Code of any ! copy you distribute of the Licensed Product. Contributors to any ! Modifications may add their own copyright notices to identify their ! own contributions. ! License: ! The contents of this file are subject to the CognitiveWeb Open Source [...989 lines suppressed...] * ! * @return The transaction with that start time or <code>null</code> if ! * the start time is not mapped to either an active or prepared ! * transaction. */ ! public ITx getTx(long startTimestamp) { ! ITx tx = activeTx.get(startTimestamp); if (tx == null) { ! tx = preparedTx.get(startTimestamp); } ! return tx; } } Index: TemporaryStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TemporaryStore.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** TemporaryStore.java 17 Feb 2007 21:34:17 -0000 1.4 --- TemporaryStore.java 19 Feb 2007 19:00:23 -0000 1.5 *************** *** 332,336 **** public IIndex getIndex(String name) { ! if(name==null) throw new IllegalArgumentException(); return name2Addr.get(name); --- 332,337 ---- public IIndex getIndex(String name) { ! if (name == null) ! throw new IllegalArgumentException(); return name2Addr.get(name); Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.30 retrieving revision 1.31 diff -C2 -d -r1.30 -r1.31 *** Tx.java 19 Feb 2007 01:05:39 -0000 1.30 --- Tx.java 19 Feb 2007 19:00:20 -0000 1.31 *************** *** 271,274 **** --- 271,297 ---- } + + /** + * The hash code is based on the {@link #getStartTimestamp()}. + * + * @todo pre-compute this value if it is used much. + */ + final public int hashCode() { + + return Long.valueOf(startTimestamp).hashCode(); + + } + + /** + * True iff they are the same object or have the same start timestamp. + * + * @param o + * Another transaction object. + */ + final public boolean equals(ITx o) { + + return this == o || startTimestamp == o.getStartTimestamp(); + + } /** *************** *** 339,345 **** /* ! * Delete the TemporaryStore. */ ! tmpStore.close(); } --- 362,372 ---- /* ! * Close and delete the TemporaryStore. */ ! if (tmpStore.isOpen()) { ! ! tmpStore.close(); ! ! } } *************** *** 706,714 **** * @return The named index or <code>null</code> if no index is registered * under that name. */ public IIndex getIndex(String name) { ! if(name==null) throw new IllegalArgumentException(); ! /* * Store the btrees in hash map so that we can recover the same instance --- 733,751 ---- * @return The named index or <code>null</code> if no index is registered * under that name. + * + * @exception IllegalStateException + * if the transaction is not active. */ public IIndex getIndex(String name) { ! if (name == null) ! throw new IllegalArgumentException(); ! ! if (!isActive()) { ! ! throw new IllegalStateException(); ! ! } ! /* * Store the btrees in hash map so that we can recover the same instance Index: RootBlockView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/RootBlockView.java,v retrieving revision 1.11 retrieving revision 1.12 diff -C2 -d -r1.11 -r1.12 *** RootBlockView.java 17 Feb 2007 21:34:17 -0000 1.11 --- RootBlockView.java 19 Feb 2007 19:00:20 -0000 1.12 *************** *** 278,282 **** } ! public long getFirstTxId() { return buf.getLong(OFFSET_FIRST_TX); --- 278,282 ---- } ! public long getFirstTxCommitTime() { return buf.getLong(OFFSET_FIRST_TX); *************** *** 284,288 **** } ! public long getLastTxId() { return buf.getLong(OFFSET_LAST_TX); --- 284,288 ---- } ! public long getLastTxCommitTime() { return buf.getLong(OFFSET_LAST_TX); *************** *** 369,374 **** sb.append(", segmentId="+getSegmentId()); sb.append(", nextOffset="+getNextOffset()); ! sb.append(", firstTxId="+getFirstTxId()); ! sb.append(", lastTxId="+getLastTxId()); sb.append(", commitTimestamp="+getCommitTimestamp()); sb.append(", commitCounter="+getCommitCounter()); --- 369,374 ---- sb.append(", segmentId="+getSegmentId()); sb.append(", nextOffset="+getNextOffset()); ! sb.append(", firstTxId="+getFirstTxCommitTime()); ! sb.append(", lastTxId="+getLastTxCommitTime()); sb.append(", commitTimestamp="+getCommitTimestamp()); sb.append(", commitCounter="+getCommitCounter()); Index: IRootBlockView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IRootBlockView.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** IRootBlockView.java 17 Feb 2007 21:34:17 -0000 1.7 --- IRootBlockView.java 19 Feb 2007 19:00:20 -0000 1.8 *************** *** 107,125 **** /** ! * The timestamp of the earliest transaction whose data are on the store or ! * 0L iff no transactions have committed on the store. * ! * @return The timestamp of the earliest transaction committed on the store. */ ! public long getFirstTxId(); /** ! * The timestamp of the most recent transaction whose data are on the store ! * or 0L iff no transactions have committed on the store. * ! * @return The timestamp of the most recent transaction committed on the * store. */ ! public long getLastTxId(); /** --- 107,126 ---- /** ! * The commit time of the earliest transaction whose data are on the store ! * or 0L iff no transactions have committed on the store. * ! * @return The commit time of the earliest transaction committed on the ! * store. */ ! public long getFirstTxCommitTime(); /** ! * The commit time of the most recent transaction whose data are on the ! * store or 0L iff no transactions have committed on the store. * ! * @return The commit time of the most recent transaction committed on the * store. */ ! public long getLastTxCommitTime(); /** |
From: Bryan T. <tho...@us...> - 2007-02-19 01:05:50
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv8025/src/java/com/bigdata/rdf Modified Files: TripleStore.java Log Message: Working on transaction processing support. Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.17 retrieving revision 1.18 diff -C2 -d -r1.17 -r1.18 *** TripleStore.java 17 Feb 2007 23:15:27 -0000 1.17 --- TripleStore.java 19 Feb 2007 01:05:47 -0000 1.18 *************** *** 67,70 **** --- 67,71 ---- import com.bigdata.journal.IJournal; import com.bigdata.journal.Journal; + import com.bigdata.journal.Tx; import com.bigdata.objndx.BTree; import com.bigdata.objndx.IIndex; *************** *** 93,96 **** --- 94,137 ---- * A triple store based on the <em>bigdata</em> architecture. * + * @todo Refactor to support transactions and concurrent load/query + * <p> + * Conflicts arise in the bigdata-RDF store when concurrent transactions + * attempt to define the same term. The problem arises because on index is + * used to map the term to an unique identifier and another to map the + * identifiers back to terms. Further, the statement indices use term + * identifiers directly in their keys. Therefore, resolving concurrent + * definition of the same term requires that we either do NOT isolate the + * writes on the term indices (which is probably an acceptable strategy) + * or that we let the application order the pass over the isolated indices + * and give the conflict resolver access to the {@link Tx} so that it can + * update the dependent indices if a conflict is discovered on the terms + * index. + * <p> + * The simplest approach appears to be NOT isolating the terms and ids + * indices. As long as the logic resides at the index, e.g., a lambda + * expression/method, to assign the identifier and create the entry in the + * ids index we can get buy with less isolation. If concurrent processes + * attempt to define the same term, then one or the other will wind up + * executing first (writes on indices are single threaded) and the result + * will be coherent as long as the write is committed before the ids are + * returned to the application. It simply does not matter which process + * defines the term since all that we care about is atomic, consistent, + * and durable. This is a case where group commit would work well (updates + * are blocked together on the server automatically to improve + * throughput). + * <p> + * Concurrent assertions of the same statement cause write-write + * conflicts, but they are trivially resolved -- we simply ignore the + * write-write conflict since both transactions agree on the statement + * data. Unlike the term indices, isolation is important for statements + * since we want to guarentee that a set of statements either is or is not + * asserted atomically. (With the terms index, we could care less as long + * as the indices are coherent.) + * <p> + * The only concern with the statement indices occurs when one transaction + * asserts a statement and a concurrent transaction deletes a statement. I + * need to go back and think this one through some more and figure out + * whether or not we need to abort a transaction in this case. + * * @todo Refactor to use a delegation mechanism so that we can run with or * without partitioned indices? (All you have to do now is change the *************** *** 98,104 **** * handle some different initialization properties.) * ! * @todo Play with the branching factor again. Now that we are using overflow ! * to evict data onto index segments we can use a higher branching factor ! * and simply evict more often. Is this worth it? We might want a lower * branching factor on the journal since we can not tell how large any * given write will be and then use larger branching factors on the index --- 139,145 ---- * handle some different initialization properties.) * ! * @todo Play with the branching factor again. Now that we are using overflow to ! * evict data onto index segments we can use a higher branching factor and ! * simply evict more often. Is this worth it? We might want a lower * branching factor on the journal since we can not tell how large any * given write will be and then use larger branching factors on the index |
From: Bryan T. <tho...@us...> - 2007-02-19 01:05:49
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv7992/src/java/com/bigdata/journal Modified Files: Tx.java Journal.java ITx.java Added Files: ValidationError.java Log Message: Working on transaction processing support. --- NEW FILE: ValidationError.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 17, 2007 */ package com.bigdata.journal; /** * An instance of this class is thrown when a transaction {@link ITx#prepare()}s * if there is a write-write conflict that can not be resolved. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class ValidationError extends RuntimeException { /** * */ private static final long serialVersionUID = 7606167478216451303L; /** * */ public ValidationError() { } /** * @param message */ public ValidationError(String message) { super(message); } /** * @param cause */ public ValidationError(Throwable cause) { super(cause); } /** * @param message * @param cause */ public ValidationError(String message, Throwable cause) { super(message, cause); } } Index: ITx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ITx.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** ITx.java 17 Feb 2007 21:34:17 -0000 1.2 --- ITx.java 19 Feb 2007 01:05:39 -0000 1.3 *************** *** 70,75 **** * * @exception IllegalStateException ! * If the transaction is not active. If the transaction is * not complete, then it will be aborted. */ public void prepare(); --- 70,79 ---- * * @exception IllegalStateException ! * If the transaction is not active. If the transaction is * not complete, then it will be aborted. + * + * @exception ValidationError + * If the transaction can not be validated. If this exception + * is thrown, then the transaction was aborted. */ public void prepare(); Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.29 retrieving revision 1.30 diff -C2 -d -r1.29 -r1.30 *** Tx.java 17 Feb 2007 21:34:17 -0000 1.29 --- Tx.java 19 Feb 2007 01:05:39 -0000 1.30 *************** *** 57,61 **** import com.bigdata.objndx.IndexSegment; import com.bigdata.objndx.ReadOnlyIndex; - import com.bigdata.rawstore.IRawStore; import com.bigdata.scaleup.MetadataIndex; import com.bigdata.scaleup.PartitionedIndex; --- 57,60 ---- *************** *** 72,75 **** --- 71,89 ---- * rejected and {@link #prepare()} and {@link #commit()} are NOPs. * </p> + * <p> + * The write set of a transaction is written onto a {@link TemporaryStore}. + * Therefore the size limit on the transaction write set is currently 2G, but + * the transaction will run in memory up to 100M. The {@link TemporaryStore} is + * closed and any backing file is deleted as soon as the transaction completes. + * </p> + * <p> + * Each {@link IsolatedBTree} is local to a transaction and is backed by its own + * store. This means that concurrent transactions can execute without + * synchronization (real concurrency) up to the point where they + * {@link #prepare()}. We do not need a read-lock on the indices isolated by + * the transaction since they are <em>historical</em> states that will not + * receive concurrent updates. This might prove to be a nice way to leverage + * multiple processors / cores on a data server. + * </p> * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 93,104 **** * we can get the commit record more cheaply. * - * @todo The write set of a transaction is currently written onto an independent - * store. This means that concurrent transactions can actually execute - * concurrently. We do not even need a read-lock on the indices isolated - * by the transaction since they are read-only. This might prove to be a - * nice way to leverage multiple processors / cores on a data server. The - * size limit on the transaction write set is currently 2G, but the - * transaction will run in memory up to 100M. - * * @todo Support transactions where the indices isolated by the transactions are * {@link PartitionedIndex}es. --- 107,110 ---- *************** *** 130,135 **** * server failed to test the pre-conditions and they were not met * ! * @todo Is it possible to have more than one transaction PREPARE must ! * concurrent PREPARE operations be serialized? */ public class Tx implements IStore, ITx { --- 136,142 ---- * server failed to test the pre-conditions and they were not met * ! * @todo PREPARE operations must be serialized unless they are provably ! * non-overlapping. This will require a handshake with either the ! * {@link Journal} or (more likely) the {@link TransactionServer}. */ public class Tx implements IStore, ITx { *************** *** 196,200 **** * time it is invoked. */ ! final protected IRawStore tmpStore = new TemporaryStore(); /** --- 203,207 ---- * time it is invoked. */ ! final protected TemporaryStore tmpStore = new TemporaryStore(); /** *************** *** 281,287 **** /** ! * Return the commit startTimestamp assigned to this transaction. * ! * @return The commit startTimestamp assigned to this transaction. * * @exception IllegalStateException --- 288,294 ---- /** ! * Return the commit timestamp assigned to this transaction. * ! * @return The commit timestamp assigned to this transaction. * * @exception IllegalStateException *************** *** 318,330 **** /** ! * This method must be invoked any time a transaction completes (aborts or ! * commits) in order to release the hard references to any named btrees ! * isolated within this transaction so that the JVM may reclaim the space ! * allocated to them on the heap. */ ! private void releaseBTrees() { btrees.clear(); } --- 325,346 ---- /** ! * This method must be invoked any time a transaction completes ({@link #abort()}s ! * or {@link #commit()}s) in order to release resources held by that ! * transaction. */ ! protected void releaseResources() { + /* + * Release hard references to any named btrees isolated within this + * transaction so that the JVM may reclaim the space allocated to them + * on the heap. + */ btrees.clear(); + /* + * Delete the TemporaryStore. + */ + tmpStore.close(); + } *************** *** 368,373 **** /* ! * Validate against the current state of the journal's object ! * index. */ --- 384,389 ---- /* ! * Validate against the current state of the various indices ! * on write the transaction has written. */ *************** *** 376,384 **** abort(); ! throw new RuntimeException( ! "Validation failed: write-write conflict"); } } catch (Throwable t) { --- 392,403 ---- abort(); ! throw new ValidationError(); } + } catch( ValidationError ex) { + + throw ex; + } catch (Throwable t) { *************** *** 408,411 **** --- 427,433 ---- * </p> * + * @return The commit timestamp or <code>0L</code> if the transaction was + * read-only. + * * @exception IllegalStateException * if the transaction has not been *************** *** 488,496 **** } finally { ! releaseBTrees(); } ! return getCommitTimestamp(); } --- 510,518 ---- } finally { ! releaseResources(); } ! return readOnly ? 0L : getCommitTimestamp(); } *************** *** 515,519 **** } finally { ! releaseBTrees(); } --- 537,541 ---- } finally { ! releaseResources(); } *************** *** 662,666 **** */ UnisolatedBTree groundState = (UnisolatedBTree)journal.getIndex(name); ! isolated.mergeDown(groundState); --- 684,693 ---- */ UnisolatedBTree groundState = (UnisolatedBTree)journal.getIndex(name); ! ! /* ! * Copy the validated write set for this index down onto the ! * corresponding unisolated index, updating version counters, delete ! * markers, and values as necessary in the unisolated index. ! */ isolated.mergeDown(groundState); Index: Journal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Journal.java,v retrieving revision 1.51 retrieving revision 1.52 diff -C2 -d -r1.51 -r1.52 *** Journal.java 17 Feb 2007 21:34:18 -0000 1.51 --- Journal.java 19 Feb 2007 01:05:39 -0000 1.52 *************** *** 1508,1511 **** --- 1508,1525 ---- } + + /** + * True iff there is a transaction that has prepared and not yet committed + * or aborted. + * <p> + * Note: It is illegal for more than one transaction to prepare at a time + * if they have write sets that overlap. Serializability depends on each + * transaction validating against the last committed ground state. + */ + public boolean isPreparedTx() { + + return ! preparedTx.isEmpty(); + + } } |
From: Bryan T. <tho...@us...> - 2007-02-19 01:05:49
|
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv7992/src/test/com/bigdata/journal Modified Files: TestJournalBasics.java TestConflictResolution.java TestReadOnlyTx.java Added Files: StressTestConcurrent.java Log Message: Working on transaction processing support. Index: TestJournalBasics.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** TestJournalBasics.java 17 Feb 2007 21:34:13 -0000 1.8 --- TestJournalBasics.java 19 Feb 2007 01:05:39 -0000 1.9 *************** *** 101,111 **** * tests of transaction support. */ // tests of transitions in the transaction RunState state machine. suite.addTestSuite( TestTxRunState.class ); // @todo update these tests of the tx-journal integration. suite.addTestSuite( TestTxJournalProtocol.class ); ! // @todo tests of read-write transactions. suite.addTestSuite( TestTx.class ); ! // @todo tests of read-only transactions. suite.addTestSuite( TestReadOnlyTx.class ); // @todo tests of read-committed transactions. --- 101,112 ---- * tests of transaction support. */ + // tests of transitions in the transaction RunState state machine. suite.addTestSuite( TestTxRunState.class ); // @todo update these tests of the tx-journal integration. suite.addTestSuite( TestTxJournalProtocol.class ); ! // tests of read-write transactions and isolation. suite.addTestSuite( TestTx.class ); ! // tests of read-only transactions. suite.addTestSuite( TestReadOnlyTx.class ); // @todo tests of read-committed transactions. *************** *** 113,118 **** // @todo tests of concurrent schedules and conflict detection. suite.addTestSuite( TestConcurrentSchedules.class ); ! // @todo tests of write-write conflict resolution. suite.addTestSuite(TestConflictResolution.class); return suite; --- 114,146 ---- // @todo tests of concurrent schedules and conflict detection. suite.addTestSuite( TestConcurrentSchedules.class ); ! // todo tests of write-write conflict resolution. suite.addTestSuite(TestConflictResolution.class); + + /* + * @todo tests of batch api and group commit mechanisms for very high + * volume updates. These tests might be more relevent to the data server + * since group commit can be achieved by transparently collecting small + * non-conflicting updates into a transaction that succeeds or fails all + * updates in the group. in this model transactions are local and + * updates do not have atomicity across journals. another alternative is + * to use a pulse to commit a global group update transaction with a + * frequency that trades off the size of the commit groups against + * latency. the advantage of the latter approach is that it can be + * combined with normal transaction processing in a trivial manner and I + * am not sure whether or not that is true of the former approach. + */ + + /* + * @todo stress tests of concurrent transactions parameterized so that + * we can also test overflow handling and scale up. Scale out testing + * may require a refactor of the clients to lookup services. This could + * be moved out of the per-journal strategy test suite and performed + * only for the target strategy, e.g., Direct. Measure throughput rates + * and compare TPS and write/read rates with other systems. + * + * @todo we could add tests based on known transaction processing + * benchmarks here as well. + */ + suite.addTestSuite(StressTestConcurrent.class); return suite; Index: TestConflictResolution.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestConflictResolution.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestConflictResolution.java 17 Feb 2007 21:34:12 -0000 1.1 --- TestConflictResolution.java 19 Feb 2007 01:05:39 -0000 1.2 *************** *** 48,51 **** --- 48,57 ---- package com.bigdata.journal; + import java.util.Properties; + + import com.bigdata.isolation.IConflictResolver; + import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.isolation.Value; + /** * Tests of write-write conflict resolution. *************** *** 60,64 **** */ public TestConflictResolution() { - // TODO Auto-generated constructor stub } --- 66,69 ---- *************** *** 68,77 **** public TestConflictResolution(String name) { super(name); - // TODO Auto-generated constructor stub } ! public void test_something() { ! fail("write tests"); } --- 73,323 ---- public TestConflictResolution(String name) { super(name); } ! /** ! * Test correct detection of a write-write conflict. An index is registered ! * and the journal is committed. Two transactions (tx1, tx2) are then ! * started. Both transactions write a value under the same key. tx1 prepares ! * and commits. tx2 attempts to prepare, and the test verifies that a ! * {@link ValidationError} is reported. ! */ ! public void test_writeWriteConflict_correctDetection() { ! ! Properties properties = getProperties(); ! ! Journal journal = new Journal(properties); ! ! String name = "abc"; ! ! final byte[] k1 = new byte[] { 1 }; ! ! final byte[] v1a = new byte[] { 1 }; ! final byte[] v1b = new byte[] { 2 }; ! ! { ! ! /* ! * register an index and commit the journal. ! */ ! ! journal.registerIndex(name, new UnisolatedBTree(journal)); ! ! journal.commit(); ! ! } ! ! /* ! * Create two transactions. ! */ ! ITx tx1 = journal.newTx(); ! ! ITx tx2 = journal.newTx(); ! ! /* ! * Write a value under the same key on the same index in both ! * transactions. ! */ ! ! tx1.getIndex(name).insert(k1, v1a); ! ! tx2.getIndex(name).insert(k1, v1b); ! ! tx1.prepare(); ! ! tx1.commit(); ! ! /* ! * verify that the value from tx1 is found under the key on the ! * unisolated index. ! */ ! assertEquals(v1a,(byte[])journal.getIndex(name).lookup(k1)); ! ! try { ! tx2.prepare(); ! fail("Expecting: "+ValidationError.class); ! } catch(ValidationError ex) { ! System.err.println("Ignoring expected exception: "+ex); ! assertTrue(tx2.isAborted()); ! } ! ! journal.close(); ! ! } ! ! /** ! * Test correct detection and resolution of a write-write conflict. An index ! * is registered with an {@link IConflictResolver} and the journal is ! * committed. Two transactions (tx1, tx2) are then started. Both ! * transactions write a value under the same key. tx1 prepares and commits. ! * tx2 attempts to prepare, and the test verifies that the conflict resolver ! * is invoked, that it may resolve the conflict causing validation to ! * succeed and that the value determined by conflict resolution is made ! * persistent when tx2 commits. ! */ ! public void test_writeWriteConflict_conflictIsResolved() { ! ! Properties properties = getProperties(); ! ! Journal journal = new Journal(properties); ! ! String name = "abc"; ! ! final byte[] k1 = new byte[] { 1 }; ! ! final byte[] v1a = new byte[] { 1 }; ! final byte[] v1b = new byte[] { 2 }; ! final byte[] v1c = new byte[] { 3 }; ! ! { ! ! /* ! * register an index with a conflict resolver and commit the ! * journal. ! */ ! ! journal.registerIndex(name, new UnisolatedBTree(journal, ! new SingleValueConflictResolver(k1, v1c))); ! ! journal.commit(); ! ! } ! ! /* ! * Create two transactions. ! */ ! ! ITx tx1 = journal.newTx(); ! ! ITx tx2 = journal.newTx(); ! ! /* ! * Write a value under the same key on the same index in both ! * transactions. ! */ ! ! tx1.getIndex(name).insert(k1, v1a); ! ! tx2.getIndex(name).insert(k1, v1b); ! ! tx1.prepare(); ! ! tx1.commit(); ! ! /* ! * verify that the value from tx1 is found under the key on the ! * unisolated index. ! */ ! assertEquals(v1a,(byte[])journal.getIndex(name).lookup(k1)); ! ! tx2.prepare(); ! ! // @todo the indices should probably become read only at this point. ! assertEquals(v1c,(byte[])tx2.getIndex(name).lookup(k1)); ! ! tx2.commit(); ! ! /* ! * verify that the resolved value is found under the key on the ! * unisolated index. ! */ ! assertEquals(v1c,(byte[])journal.getIndex(name).lookup(k1)); ! ! journal.close(); ! ! } ! ! /** ! * The concurrency control algorithm must not permit two transactions to ! * prepare at the same time since that violates the basic rules of ! * serializability. ! * ! * @todo javadoc and move into schedules test suite or its own test suite. ! */ ! public void test_serializability() { ! ! Properties properties = getProperties(); ! ! Journal journal = new Journal(properties); ! ! String name = "abc"; ! ! final byte[] k1 = new byte[] { 1 }; ! ! final byte[] v1a = new byte[] { 1 }; ! final byte[] v1b = new byte[] { 2 }; ! ! { ! ! /* ! * register an index and commit the journal. ! */ ! ! journal.registerIndex(name, new UnisolatedBTree(journal)); ! ! journal.commit(); ! ! } ! ! /* ! * Create two transactions. ! */ ! ! ITx tx1 = journal.newTx(); ! ! ITx tx2 = journal.newTx(); ! ! /* ! * Write a value under the same key on the same index in both ! * transactions. ! */ ! ! tx1.getIndex(name).insert(k1, v1a); ! ! tx2.getIndex(name).insert(k1, v1b); ! ! tx1.prepare(); ! ! try { ! tx2.prepare(); ! fail("Expecting: "+IllegalStateException.class); ! } catch(IllegalStateException ex) { ! System.err.println("Ignoring expected exception: "+ex); ! } ! ! journal.close(); ! ! } ! ! /** ! * Helper class used to resolve a predicted conflict to a known value. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! public static class SingleValueConflictResolver implements IConflictResolver ! { ! ! private final byte[] expectedKey; ! private final byte[] resolvedValue; ! ! private static final long serialVersionUID = -1161201507073182976L; ! ! public SingleValueConflictResolver(byte[] expectedKey, byte[] resolvedValue) { ! ! this.expectedKey = expectedKey; ! ! this.resolvedValue = resolvedValue; ! ! } ! ! public byte[] resolveConflict(byte[] key, Value comittedValue, ! Value txEntry) throws RuntimeException { ! ! assertEquals("key",expectedKey,key); ! ! return resolvedValue; ! ! } } Index: TestReadOnlyTx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestReadOnlyTx.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestReadOnlyTx.java 17 Feb 2007 21:34:13 -0000 1.1 --- TestReadOnlyTx.java 19 Feb 2007 01:05:39 -0000 1.2 *************** *** 48,51 **** --- 48,56 ---- package com.bigdata.journal; + import java.util.Properties; + + import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.objndx.IIndex; + /** * Test suite for fully isolated read-only transactions. *************** *** 69,76 **** } ! public void test_something() { ! fail("write tests"); } --- 74,161 ---- } ! /** ! * Test verifies that you can not write on a read-only transaction. ! */ ! public void test_isReadOnly() { ! Properties properties = getProperties(); + Journal journal = new Journal(properties); + + String name = "abc"; + + final byte[] k1 = new byte[]{1}; + + final byte[] v1 = new byte[]{1}; + + { + + /* + * register an index, write on the index, and commit the journal. + */ + IIndex ndx = journal.registerIndex(name, new UnisolatedBTree( + journal)); + + ndx.insert(k1, v1); + + journal.commit(); + + } + + { + + /* + * create a read-only transaction, verify that we can read the + * value written on the index but that we can not write on the + * index. + */ + + ITx tx1 = journal.newTx(true); + + IIndex ndx = tx1.getIndex(name); + + assertNotNull(ndx); + + assertEquals((byte[])v1,(byte[])ndx.lookup(k1)); + + try { + ndx.insert(k1,new byte[]{1,2,3}); + fail("Expecting: "+UnsupportedOperationException.class); + } catch( UnsupportedOperationException ex) { + System.err.println("Ignoring expected exception: "+ex); + } + + tx1.prepare(); + + tx1.commit(); + + } + + { + + /* + * do it again, but this time we will abort the read-only + * transaction. + */ + + ITx tx1 = journal.newTx(true); + + IIndex ndx = tx1.getIndex(name); + + assertNotNull(ndx); + + assertEquals((byte[])v1,(byte[])ndx.lookup(k1)); + + try { + ndx.insert(k1,new byte[]{1,2,3}); + fail("Expecting: "+UnsupportedOperationException.class); + } catch( UnsupportedOperationException ex) { + System.err.println("Ignoring expected exception: "+ex); + } + + tx1.abort(); + + } + } --- NEW FILE: StressTestConcurrent.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 18, 2007 */ package com.bigdata.journal; /** * Stress tests for concurrent transaction processing. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class StressTestConcurrent extends ProxyTestCase { public StressTestConcurrent() { } public StressTestConcurrent(String name) { super(name); } public void test_concurrentClients() { fail("write tests"); } } |
From: Bryan T. <tho...@us...> - 2007-02-19 01:05:45
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv7992/src/java/com/bigdata/isolation Modified Files: IsolatedBTree.java UnisolatedBTree.java Log Message: Working on transaction processing support. Index: UnisolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/UnisolatedBTree.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** UnisolatedBTree.java 17 Feb 2007 21:34:21 -0000 1.5 --- UnisolatedBTree.java 19 Feb 2007 01:05:39 -0000 1.6 *************** *** 113,116 **** --- 113,123 ---- /** + * The default branching factor is choosen to be relatively small since that + * will cause less growth in the store without sacrificing too much + * performance. + */ + public static final int DEFAULT_BRANCHING_FACTOR = 32; + + /** * The optional conflict resolver. */ *************** *** 142,148 **** } /** ! * Create an isolated btree. * * @param store --- 149,179 ---- } + + /** + * Create an isolatable btree using a default branching factor. + * + * @param store + */ + public UnisolatedBTree(IRawStore store) { + + this(store, DEFAULT_BRANCHING_FACTOR, null); + + } /** ! * Create an isolatable btree using a default branching factor and the ! * specified {@link IConflictResolver}. ! * ! * @param store ! * @param conflictResolver ! */ ! public UnisolatedBTree(IRawStore store, IConflictResolver conflictResolver) { ! ! this(store, DEFAULT_BRANCHING_FACTOR, conflictResolver); ! ! } ! ! /** ! * Create an isolatable btree. * * @param store *************** *** 151,160 **** public UnisolatedBTree(IRawStore store, int branchingFactor) { ! this(store,branchingFactor, null); } /** ! * Create an isolated btree. * * @param store --- 182,191 ---- public UnisolatedBTree(IRawStore store, int branchingFactor) { ! this(store, branchingFactor, null); } /** ! * Create an isolatable btree. * * @param store Index: IsolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/IsolatedBTree.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IsolatedBTree.java 17 Feb 2007 21:34:21 -0000 1.4 --- IsolatedBTree.java 19 Feb 2007 01:05:39 -0000 1.5 *************** *** 218,222 **** throw new IllegalArgumentException(); ! Value value = (Value) super.lookup(key); if (value == null) { --- 218,222 ---- throw new IllegalArgumentException(); ! Value value = (Value) super.getValue((byte[])key); if (value == null) { |
From: Bryan T. <tho...@us...> - 2007-02-17 23:15:31
|
Update of /cvsroot/cweb/bigdata-rdf/src/resources/logging In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23767/src/resources/logging Modified Files: log4j.properties Log Message: Bracked some logger statements so that they are only generated when needed. Index: log4j.properties =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/resources/logging/log4j.properties,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** log4j.properties 26 Jan 2007 18:22:16 -0000 1.1 --- log4j.properties 17 Feb 2007 23:15:26 -0000 1.2 *************** *** 11,14 **** --- 11,15 ---- log4j.logger.com.bigdata=WARN log4j.logger.com.bigdata.rdf=INFO + log4j.logger.com.bigdata.rdf.TripleStore=DEBUG log4j.logger.junit.framework.Test=INFO |
From: Bryan T. <tho...@us...> - 2007-02-17 23:15:31
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23767/src/java/com/bigdata/rdf Modified Files: TripleStore.java Log Message: Bracked some logger statements so that they are only generated when needed. Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** TripleStore.java 17 Feb 2007 21:34:57 -0000 1.16 --- TripleStore.java 17 Feb 2007 23:15:27 -0000 1.17 *************** *** 58,61 **** --- 58,62 ---- import java.util.Properties; + import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.openrdf.model.Resource; *************** *** 213,218 **** --- 214,235 ---- public class TripleStore extends /*Partitioned*/Journal { + /** + * The logger for the {@link TripleStore} (shadows the logger for the + * journal). + */ static transient public Logger log = Logger.getLogger(TripleStore.class); + /** + * True iff the {@link #log} level is INFO or less. + */ + final public boolean INFO = log.getEffectiveLevel().toInt() <= Level.INFO + .toInt(); + + /** + * True iff the {@link #log} level is DEBUG or less. + */ + final public boolean DEBUG = log.getEffectiveLevel().toInt() <= Level.DEBUG + .toInt(); + /* * Declare indices for root addresses for the different indices maintained |
From: Bryan T. <tho...@us...> - 2007-02-17 23:15:31
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23767/src/java/com/bigdata/rdf/inf Modified Files: InferenceEngine.java Log Message: Bracked some logger statements so that they are only generated when needed. Index: InferenceEngine.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/inf/InferenceEngine.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** InferenceEngine.java 17 Feb 2007 03:07:59 -0000 1.9 --- InferenceEngine.java 17 Feb 2007 23:15:26 -0000 1.10 *************** *** 331,334 **** --- 331,335 ---- long insertTime = System.currentTimeMillis() - insertStart; + if(DEBUG){ StringBuilder debug = new StringBuilder(); debug.append( "round #" ).append( round++ ).append( ": " ); *************** *** 338,341 **** --- 339,343 ---- debug.append( insertTime ).append( " millis " ); log.debug( debug.toString() ); + } } *************** *** 344,351 **** final int lastStatementCount = getStatementCount(); ! ! log.debug("Closed store in " + elapsed + "ms yeilding " + lastStatementCount + " statements total, " + (lastStatementCount - firstStatementCount) + " inferences"); } --- 346,355 ---- final int lastStatementCount = getStatementCount(); ! ! if(INFO) { ! log.info("Closed store in " + elapsed + "ms yeilding " + lastStatementCount + " statements total, " + (lastStatementCount - firstStatementCount) + " inferences"); + } } |
From: Bryan T. <tho...@us...> - 2007-02-17 21:35:01
|
Update of /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv12912/src/java/com/bigdata/rdf Modified Files: TempTripleStore.java TripleStore.java Log Message: Working through transactional isolation. Index: TempTripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TempTripleStore.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TempTripleStore.java 17 Feb 2007 03:08:00 -0000 1.1 --- TempTripleStore.java 17 Feb 2007 21:34:57 -0000 1.2 *************** *** 48,69 **** package com.bigdata.rdf; - import java.io.BufferedReader; - import java.io.File; - import java.io.FileInputStream; - import java.io.IOException; - import java.io.InputStreamReader; - import java.io.Reader; - import java.util.Arrays; import java.util.Locale; - import java.util.Properties; import org.apache.log4j.Logger; - import org.openrdf.model.Resource; - import org.openrdf.model.URI; - import org.openrdf.model.Value; - import com.bigdata.journal.IJournal; - import com.bigdata.journal.Journal; - import com.bigdata.journal.RootBlockView; import com.bigdata.journal.TemporaryStore; import com.bigdata.objndx.BTree; --- 48,55 ---- *************** *** 73,214 **** import com.bigdata.rawstore.Bytes; import com.bigdata.rdf.inf.SPO; - import com.bigdata.rdf.model.OptimizedValueFactory.OSPComparator; - import com.bigdata.rdf.model.OptimizedValueFactory.POSComparator; - import com.bigdata.rdf.model.OptimizedValueFactory.SPOComparator; - import com.bigdata.rdf.model.OptimizedValueFactory.TermIdComparator; import com.bigdata.rdf.model.OptimizedValueFactory._Statement; - import com.bigdata.rdf.model.OptimizedValueFactory._Value; - import com.bigdata.rdf.model.OptimizedValueFactory._ValueSortKeyComparator; - import com.bigdata.rdf.rio.IRioLoader; - import com.bigdata.rdf.rio.PresortRioLoader; - import com.bigdata.rdf.rio.RioLoaderEvent; - import com.bigdata.rdf.rio.RioLoaderListener; - import com.bigdata.rdf.serializers.RdfValueSerializer; import com.bigdata.rdf.serializers.StatementSerializer; - import com.bigdata.rdf.serializers.TermIdSerializer; - import com.bigdata.scaleup.PartitionedIndex; - import com.bigdata.scaleup.PartitionedJournal; - import com.bigdata.scaleup.SlaveJournal; import com.ibm.icu.text.Collator; import com.ibm.icu.text.RuleBasedCollator; /** ! * A triple store based on the <em>bigdata</em> architecture. ! * ! * @todo Refactor to use a delegation mechanism so that we can run with or ! * without partitioned indices? (All you have to do now is change the ! * class that is being extended from Journal to PartitionedJournal and ! * handle some different initialization properties.) ! * ! * @todo Play with the branching factor again. Now that we are using overflow ! * to evict data onto index segments we can use a higher branching factor ! * and simply evict more often. Is this worth it? We might want a lower ! * branching factor on the journal since we can not tell how large any ! * given write will be and then use larger branching factors on the index ! * segments. ! * ! * @todo try loading some very large data sets; try Transient vs Disk vs Direct ! * modes. If Transient has significantly better performance then it ! * indicates that we are waiting on IO so introduce AIO support in the ! * Journal and try Disk vs Direct with aio. Otherwise, consider ! * refactoring the btree to have the values be variable length byte[]s ! * with serialization in the client and other tuning focused on IO (the ! * only questions with that approach are appropriate compression ! * techniques and handling transparently timestamps as part of the value ! * when using an isolated btree in a transaction). ! * ! * @todo the only added cost for a quad store is the additional statement ! * indices. There are only three more statement indices in a quad store. ! * Since statement indices are so cheap, it is probably worth implementing ! * them now, even if only as a configuration option. ! * ! * @todo verify read after commit (restart safe) for large data sets and test ! * re-load rate for a data set and verify that no new statements are ! * added. ! * ! * @todo add bulk data export (buffering statements and bulk resolving term ! * identifiers). ! * ! * @todo The use of long[] identifiers for statements also means that the SPO ! * and other statement indices are only locally ordered so they can not be ! * used to perform a range scan that is ordered in the terms without ! * joining against the various term indices and then sorting the outputs. ! * ! * @todo possibly save frequently seen terms in each batch for the next batch in ! * order to reduce unicode conversions. ! * ! * @todo support metadata about the statement, e.g., whether or not it is an ! * inference. ! * ! * @todo compute the MB/sec rate at which the store can load data and compare it ! * with the maximum transfer rate for the journal without the btree and ! * the maximum transfer rate to disk. this will tell us the overhead of ! * the btree implementation. ! * ! * @todo Try a variant in which we have metadata linking statements and terms ! * together. In this case we would have to go back to the terms and update ! * them to have metadata about the statement. it is a bit circular since ! * we can not create the statement until we have the terms and we can not ! * add the metadata to the terms until we have the statement. ! * ! * @todo Note that a very interesting solution for RDF places all data into a ! * statement index and then use block compression techniques to remove ! * frequent terms, e.g., the repeated parts of the value. Also note that ! * there will be no "value" for an rdf statement since existence is all. ! * The key completely encodes the statement. So, another approach is to ! * bit code the repeated substrings found within the key in each leaf. ! * This way the serialized key size reflects only the #of distinctions. ! * ! * @todo I've been thinking about rdfs stores in the light of the work on ! * bigdata. Transactional isolation for rdf is really quite simple. Since ! * lexicons (uri, literal or bnode indices) do not (really) support ! * deletion, the only acts are asserting term and asserting and retracting ! * statements. since assertion terms can lead to write-write conflicts, ! * which must be resolved and can cascade into the statement indices since ! * the statement key depends directly on the assigned term identifiers. a ! * statement always merges with an existing statement, inserts never cause ! * conflicts. Hence the only possible write-write conflict for the ! * statement indices is a write-delete conflict. quads do not really make ! * this more complex (or expensive) since merges only occur when there is ! * a context match. however entailments can cause twists depending on how ! * they are realized. ! * ! * If we do a pure RDF layer (vs RDF over GOM over bigdata), then it seems that ! * we could simple use a statement index (no lexicons for URIs, etc). Normally ! * this inflates the index size since you have lots of duplicate strings, but we ! * could just use block compression to factor out those strings when we evict ! * index leaves to disk. Prefix compression of keys will already do great things ! * for removing repetitive strings from the index nodes and block compression ! * will get at the leftover redundancy. ! * ! * So, one dead simple architecture is one index per access path (there is of ! * course some index reuse across the access paths) with the statements inline ! * in the index using prefix key compression and block compression to remove ! * redundancy. Inserts on this architecture would just send triples to the store ! * and the various indices would be maintained by the store itself. Those ! * indices could be load balanced in segments across a cluster. ! * ! * Since a read that goes through to disk reads an entire leaf at a time, the ! * most obvious drawback that I see is caching for commonly used assertions, but ! * that is easy to implement with some cache invalidation mechanism coupled to ! * deletes. ! * ! * I can also see how to realize very large bulk inserts outside of a ! * transactional context while handling concurrent transactions -- you just have ! * to reconcile as of the commit time of the bulk insert and you get to do that ! * using efficient compacting sort-merges of "perfect" bulk index segments. The ! * architecture would perform well on concurrent apstars style document loading ! * as well as what we might normally consider a bulk load (a few hundred ! * megabytes of data) within the normal transaction mechanisms, but if you ! * needed to ingest uniprot you would want to use a different technique :-) ! * outside of the normal transactional isolation mechanisms. * ! * I'm not sure what the right solution is for entailments, e.g., truth ! * maintenance vs eager closure. Either way, you would definitely want to avoid ! * tuple at a time processing and batch things up so as to minimize the #of ! * index tests that you had to do. So, handling entailments and efficient joins ! * for high-level query languages would be the two places for more thought. And ! * there are little odd spots in RDF - handling bnodes, typed literals, and the ! * concept of a total sort order for the statement index. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 59,73 ---- import com.bigdata.rawstore.Bytes; import com.bigdata.rdf.inf.SPO; import com.bigdata.rdf.model.OptimizedValueFactory._Statement; import com.bigdata.rdf.serializers.StatementSerializer; import com.ibm.icu.text.Collator; import com.ibm.icu.text.RuleBasedCollator; /** ! * A temporary triple store based on the <em>bigdata</em> architecture. Data ! * is buffered in memory but will overflow to disk for large stores. * ! * @todo refactor to use a delegate pattern so that we can share code with ! * {@link TripleStore} while deriving from a different base class. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> Index: TripleStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata-rdf/src/java/com/bigdata/rdf/TripleStore.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** TripleStore.java 12 Feb 2007 21:51:43 -0000 1.15 --- TripleStore.java 17 Feb 2007 21:34:57 -0000 1.16 *************** *** 63,69 **** import org.openrdf.model.Value; import com.bigdata.journal.IJournal; import com.bigdata.journal.Journal; - import com.bigdata.journal.RootBlockView; import com.bigdata.objndx.BTree; import com.bigdata.objndx.IIndex; --- 63,69 ---- import org.openrdf.model.Value; + import com.bigdata.journal.ICommitRecord; import com.bigdata.journal.IJournal; import com.bigdata.journal.Journal; import com.bigdata.objndx.BTree; import com.bigdata.objndx.IIndex; *************** *** 85,89 **** import com.bigdata.rdf.serializers.TermIdSerializer; import com.bigdata.scaleup.PartitionedIndex; - import com.bigdata.scaleup.PartitionedJournal; import com.bigdata.scaleup.SlaveJournal; import com.ibm.icu.text.Collator; --- 85,88 ---- *************** *** 220,224 **** * by the store. */ ! static public transient final int ROOT_COUNTER = RootBlockView.FIRST_USER_ROOT + 5; public RdfKeyBuilder keyBuilder; --- 219,223 ---- * by the store. */ ! static public transient final int ROOT_COUNTER = ICommitRecord.FIRST_USER_ROOT; public RdfKeyBuilder keyBuilder; *************** *** 349,353 **** long addr; ! if ((addr = getAddr(ROOT_COUNTER)) == 0L) { // Note: first termId is ONE (1). Zero is reserved. --- 348,352 ---- long addr; ! if ((addr = getRootAddr(ROOT_COUNTER)) == 0L) { // Note: first termId is ONE (1). Zero is reserved. *************** *** 1021,1029 **** } ! public void commit() { final long begin = System.currentTimeMillis(); ! super.commit(); final long elapsed = System.currentTimeMillis() - begin; --- 1020,1028 ---- } ! public long commit() { final long begin = System.currentTimeMillis(); ! final long commitTime = super.commit(); final long elapsed = System.currentTimeMillis() - begin; *************** *** 1033,1036 **** --- 1032,1037 ---- usage(); + return commitTime; + } |
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv12401/src/java/com/bigdata/journal Modified Files: Tx.java ITx.java IRootBlockView.java RootBlockView.java TransactionServer.java IAtomicStore.java TemporaryStore.java IJournal.java Name2Addr.java FileMetadata.java Journal.java Added Files: CommitRecord.java LocalTimestampService.java CommitRecordIndex.java ICommitRecord.java ITimestampService.java IIndexManager.java CommitRecordSerializer.java Log Message: Working through transactional isolation. --- NEW FILE: CommitRecordIndex.java --- package com.bigdata.journal; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.CognitiveWeb.extser.LongPacker; import com.bigdata.objndx.BTree; import com.bigdata.objndx.BTreeMetadata; import com.bigdata.objndx.IValueSerializer; import com.bigdata.objndx.KeyBuilder; import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.IRawStore; /** * BTree mapping commit times to {@link ICommitRecord}s. The keys are the long * integers. The values are {@link Entry} objects recording the commit time of * the index and the {@link Addr address} of the {@link ICommitRecord} for that * commit time. */ public class CommitRecordIndex extends BTree { /** * @todo refactor to share with the {@link Journal}? */ private KeyBuilder keyBuilder = new KeyBuilder(); // /** // * Cache of added/retrieved commit records. // * // * @todo This only works for exact timestamp matches so the cache might not // * be very useful here. Also, this must be a weak value cache or it will // * leak memory. // */ // private Map<Long, ICommitRecord> cache = new HashMap<Long, ICommitRecord>(); public CommitRecordIndex(IRawStore store) { super(store, DEFAULT_BRANCHING_FACTOR, ValueSerializer.INSTANCE); } /** * Load from the store. * * @param store * The backing store. * @param metadataId * The metadata record for the index. */ public CommitRecordIndex(IRawStore store, BTreeMetadata metadata) { super(store, metadata); } /** * Encodes the commit time into a key. * * @param commitTime * The commit time. * * @return The corresponding key. */ protected byte[] getKey(long commitTime) { return keyBuilder.reset().append(commitTime).getKey(); } /** * Existence test for a commit record with the specified commit timestamp. * * @param commitTime * The commit timestamp. * @return true iff such an {@link ICommitRecord} exists in the index with * that commit timestamp. */ public boolean hasTimestamp(long commitTime) { return super.contains(getKey(commitTime)); } /** * Return the {@link ICommitRecord} with the given timestamp (exact match). * This method tests a cache of the named btrees and will return the same * instance if the index is found in the cache. * * @param commitTime * The commit time. * * @return The {@link ICommitRecord} index or <code>null</code> iff there * is no {@link ICommitTimestamp} for that commit time. */ public ICommitRecord get(long commitTime) { ICommitRecord commitRecord; // commitRecord = cache.get(commitTime); // // if (commitRecord != null) // return commitRecord; final Entry entry = (Entry) super.lookup(getKey(commitTime)); if (entry == null) { return null; } /* * re-load the commit record from the store. */ commitRecord = loadCommitRecord(store,entry.addr); // /* // * save commit time -> commit record mapping in transient cache (this // * only works for for exact matches on the timestamp so the cache may // * not be very useful here). // */ // cache.put(commitRecord.getTimestamp(),commitRecord); // return btree. return commitRecord; } /** * Return the {@link ICommitRecord} having the largest timestamp that is * strictly less than the given timestamp. * * @param timestamp * The given timestamp. * * @return The commit record -or- <code>null</code> iff there are no * {@link ICommitRecord}s in the that satisify the probe. * * @see #get(long) */ public ICommitRecord find(long timestamp) { final int index = findIndexOf(timestamp); if(index == -1) { // No match. return null; } // return the matched record. Entry entry = (Entry) super.valueAt( index ); return loadCommitRecord(store,entry.addr); } /** * Find the index of the {@link ICommitRecord} having the largest timestamp * that is less than or equal to the given timestamp. * * @return The index of the {@link ICommitRecord} having the largest * timestamp that is less than or equal to the given timestamp -or- * <code>-1</code> iff there are no {@link ICommitRecord}s * defined. */ public int findIndexOf(long timestamp) { int pos = super.indexOf(getKey(timestamp)); if (pos < 0) { /* * the key lies between the entries in the index, or possible before * the first entry in the index. [pos] represents the insert * position. we convert it to an entry index and subtract one to get * the index of the first commit record less than the given * timestamp. */ pos = -(pos+1); if(pos == 0) { // No entry is less than or equal to this timestamp. return -1; } pos--; return pos; } else { /* * exact hit on an entry. */ return pos; } } /** * Re-load a commit record from the store. * * @param store * The store. * @param addr * The address of the {@link ICommitRecord}. * * @return The {@link ICommitRecord} loaded from the specified address. */ protected ICommitRecord loadCommitRecord(IRawStore store, long addr) { return CommitRecordSerializer.INSTANCE.deserialize(store.read(addr, null)); } /** * Add an entry for a commit record.. * * @param commitRecord * The commit record. * * @param commitRecordAddr * The address at which that commit record was written on the * store. * * @exception IllegalArgumentException * if <i>commitRecord</i> is <code>null</code>. * @exception IllegalArgumentException * if there is already a {@link ICommitRecord} registered * under for the {@link ICommitRecord#getTimestamp()}. * @exception IllegalArgumentException * if <i>addr</i> is invalid. */ public void add(long commitRecordAddr, ICommitRecord commitRecord) { if (commitRecord == null) throw new IllegalArgumentException(); if (commitRecordAddr == 0L) throw new IllegalArgumentException(); final long commitTime = commitRecord.getTimestamp(); final byte[] key = getKey(commitTime); if(super.contains(key)) { throw new IllegalArgumentException( "commit record exists: timestamp=" + commitTime); } // add an entry to the persistent index. super.insert(key,new Entry(commitTime,commitRecordAddr)); // // add to the transient cache. // cache.put(commitTime, commitRecord); } /** * An entry in the persistent index. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class Entry { /** * The commit time. */ public final long commitTime; /** * The {@link Addr address} of the {@link ICommitRecord} whose commit * timestamp is {@link #commitTime}. */ public final long addr; public Entry(long commitTime,long addr) { this.commitTime = commitTime; this.addr = addr; } } /** * The values are {@link Entry}s. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class ValueSerializer implements IValueSerializer { /** * */ private static final long serialVersionUID = 8085229450005958541L; public static transient final IValueSerializer INSTANCE = new ValueSerializer(); final public static transient int VERSION0 = 0x0; public ValueSerializer() { } public void putValues(DataOutputStream os, Object[] values, int n) throws IOException { os.writeInt(VERSION0); for (int i = 0; i < n; i++) { Entry entry = (Entry) values[i]; LongPacker.packLong(os, entry.commitTime); LongPacker.packLong(os, entry.addr); } } public void getValues(DataInputStream is, Object[] values, int n) throws IOException { final int version = is.readInt(); if (version != VERSION0) throw new RuntimeException("Unknown version: " + version); for (int i = 0; i < n; i++) { final long commitTime = Long.valueOf(LongPacker.unpackLong(is)); final long addr = Long.valueOf(LongPacker.unpackLong(is)); values[i] = new Entry(commitTime,addr); } } } } --- NEW FILE: ITimestampService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; /** * A service for unique timestamps. * * @todo define a low-latency implementation for use with a distributed database * commit protocol. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface ITimestampService { /** * Return the next unique timestamp. */ public long nextTimestamp(); } --- NEW FILE: CommitRecordSerializer.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ package com.bigdata.journal; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.CognitiveWeb.extser.LongPacker; import com.bigdata.io.ByteBufferInputStream; import com.bigdata.rawstore.Bytes; /** * A helper class for (de-)serializing the root addresses. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo we could use run length encoding to write only the used roots. */ public class CommitRecordSerializer { public static final int VERSION0 = 0x0; public static final transient CommitRecordSerializer INSTANCE = new CommitRecordSerializer(); public byte[] serialize(ICommitRecord commitRecord) { final long timestamp = commitRecord.getTimestamp(); final int n = commitRecord.getRootAddrCount(); try { ByteArrayOutputStream baos = new ByteArrayOutputStream( n * Bytes.SIZEOF_LONG); DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(VERSION0); dos.writeLong(timestamp); LongPacker.packLong(dos, n); for(int i=0; i<n; i++) { LongPacker.packLong(dos, commitRecord.getRootAddr(i)); } return baos.toByteArray(); } catch (IOException ex) { throw new RuntimeException(ex); } } public ICommitRecord deserialize(ByteBuffer buf) { try { ByteBufferInputStream bbis = new ByteBufferInputStream(buf); DataInputStream dis = new DataInputStream(bbis); final int version = dis.readInt(); if (version != VERSION0) throw new RuntimeException("Unknown version: " + version); final long timestamp = dis.readLong(); final int n = (int)LongPacker.unpackLong(dis); final long[] roots = new long[n]; for (int i = 0; i < n; i++) { roots[i] = LongPacker.unpackLong(dis); } return new CommitRecord(timestamp,roots); } catch (IOException ex) { throw new RuntimeException(ex); } } } --- NEW FILE: ICommitRecord.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import com.bigdata.rawstore.Addr; /** * An interface providing a read-only view of a commit record. A commit record * is written on each commit. The basic metadata in the commit record are the * root addresses from which various critical resources may be loaded, e.g., * data structures for mapping index names to their addresses on the * {@link Journal}, etc. The {@link Journal} maintains an * {@link Journal#getCommitRecord()} index over the commits records so that * {@link Tx transactions} can rapidly recover the commit record corresponding * to their historical, read-only ground state. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo consider modifying to allow named root addresses for a small #of root * names. E.g., an set of {name,addr} pairs that is either scanned as an * array list or lookup using a hash table. */ public interface ICommitRecord { /** * The #of root ids. Their indices are [0:N-1]. */ static public final int MAX_ROOT_ADDRS = 50; /** * The first root address that may be used for a user-defined object. User * defined root addresses begin at index 10. The first 10 root addresses are * reserved for use by the bigdata architecture. */ static public final int FIRST_USER_ROOT = 10; /** * The timestamp assigned to this commit record -or- <code>0L</code> iff * there is no {@link ICommitRecord} written on the {@link Journal}. */ public long getTimestamp(); /** * The #of allowed root addresses. */ public int getRootAddrCount(); /** * The last address stored in the specified root address in this * commit record. * * @param index * The index of the root {@link Addr address}. * * @return The {@link Addr address} stored at that index. * * @exception IndexOutOfBoundsException * if the index is negative or too large. */ public long getRootAddr(int index); } Index: Name2Addr.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Name2Addr.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** Name2Addr.java 9 Feb 2007 18:56:59 -0000 1.2 --- Name2Addr.java 17 Feb 2007 21:34:17 -0000 1.3 *************** *** 146,150 **** /* re-load btree from the store. */ ! btree = loadBTree(store,entry.name,entry.addr); // save name -> btree mapping in transient cache. --- 146,150 ---- /* re-load btree from the store. */ ! btree = BTreeMetadata.load(this.store, entry.addr); // save name -> btree mapping in transient cache. *************** *** 157,183 **** /** - * Re-load a named index from the store. - * <p> - * The default implementation uses the {@link BTree} constructor. In you - * need to return either a subclass of {@link BTree} or another - * implementation of {@link IIndex} then you MUST override this method to - * use the appropriate constructor. - * - * @param store - * The store. - * @param name - * The index name. - * @param addr - * The address of the metadata record. - * - * @return The named index as loaded from the specified address. - */ - protected IIndex loadBTree(IRawStore store, String name, long addr) { - - return new BTree(this.store, BTreeMetadata.read(this.store, addr)); - - } - - /** * Add an entry for the named index. * --- 157,160 ---- Index: IJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IJournal.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IJournal.java 10 Feb 2007 15:37:44 -0000 1.4 --- IJournal.java 17 Feb 2007 21:34:17 -0000 1.5 *************** *** 64,68 **** * @version $Id$ */ ! public interface IJournal extends IRawStore, IAtomicStore, IStore { /** --- 64,68 ---- * @version $Id$ */ ! public interface IJournal extends IRawStore, IAtomicStore, IIndexManager { /** *************** *** 89,139 **** /** - * Register a named index. Once registered the index will participate in - * atomic commits. - * <p> - * Note: A named index must be registered outside of any transaction before - * it may be used inside of a transaction. - * <p> - * Note: The return object MAY differ from the supplied {@link BTree}. For - * example, when using partitioned indices the {@link BTree} is encapsulated - * within an abstraction that knows how to managed index partitions. - * - * @param name - * The name that can be used to recover the index. - * - * @param btree - * The btree. - * - * @return The object that would be returned by {@link #getIndex(String)}. - * - * @todo The provided {@link BTree} must serve as a prototype so that it is - * possible to retain additional metadata. - */ - public IIndex registerIndex(String name, IIndex btree); - - /** - * Drops the named index (unisolated). The index is removed as a - * {@link ICommitter} and all resources dedicated to that index are - * reclaimed, including secondary index segment files, the metadata index, - * etc. - * - * @param name - * The name of the index to be dropped. - * - * @exception IllegalArgumentException - * if <i>name</i> does not identify a registered index. - * - * @todo add a rename index method, but note that names in the file system - * would not change. - * - * @todo declare a method that returns or visits the names of the registered - * indices. - * - * @todo consider adding a delete method that releases all resources - * (including indices) and then closes the journal. - */ - public void dropIndex(String name); - - /** * Return the named index which MAY may be invalidated by a * {@link IAtomicStore#commit()}. --- 89,92 ---- Index: IAtomicStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IAtomicStore.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** IAtomicStore.java 15 Feb 2007 01:34:22 -0000 1.3 --- IAtomicStore.java 17 Feb 2007 21:34:17 -0000 1.4 *************** *** 68,71 **** --- 68,74 ---- * Request an atomic commit. * + * @return The timestamp assigned to the {@link ICommitRecord} -or- 0L if + * there were no data to commit. + * * @exception IllegalStateException * if the store is not open. *************** *** 73,77 **** * if the store is not writable. */ ! public void commit(); /** --- 76,80 ---- * if the store is not writable. */ ! public long commit(); /** *************** *** 107,116 **** * committed state of the store. * ! * @param rootSlot ! * The root slot identifier. * ! * @return The {@link Addr address} written in that slot. */ ! public long getAddr(int rootSlot); } --- 110,122 ---- * committed state of the store. * ! * @param index ! * The index of the root {@link Addr address}. * ! * @return The {@link Addr address} stored at that index. ! * ! * @exception IndexOutOfBoundsException ! * if the index is negative or too large. */ ! public long getRootAddr(int index); } Index: FileMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/FileMetadata.java,v retrieving revision 1.11 retrieving revision 1.12 diff -C2 -d -r1.11 -r1.12 *** FileMetadata.java 15 Feb 2007 20:59:21 -0000 1.11 --- FileMetadata.java 17 Feb 2007 21:34:18 -0000 1.12 *************** *** 1,2 **** --- 1,45 ---- + /** + + The Notice below must appear in each file of the Source Code of any + copy you distribute of the Licensed Product. Contributors to any + Modifications may add their own copyright notices to identify their + own contributions. + + License: + + The contents of this file are subject to the CognitiveWeb Open Source + License Version 1.1 (the License). You may not copy or use this file, + in either source code or executable form, except in compliance with + the License. You may obtain a copy of the License from + + http://www.CognitiveWeb.org/legal/license/ + + Software distributed under the License is distributed on an AS IS + basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + the License for the specific language governing rights and limitations + under the License. + + Copyrights: + + Portions created by or assigned to CognitiveWeb are Copyright + (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact + information for CognitiveWeb is available at + + http://www.CognitiveWeb.org + + Portions Copyright (c) 2002-2003 Bryan Thompson. + + Acknowledgements: + + Special thanks to the developers of the Jabber Open Source License 1.0 + (JOSL), from which this License was derived. This License contains + terms that differ from JOSL. + + Special thanks to the CognitiveWeb Open Source Contributors for their + suggestions and support of the Cognitive Web. + + Modifications: + + */ package com.bigdata.journal; *************** *** 462,472 **** nextOffset = 0; final long commitCounter = 0L; final long firstTxId = 0L; final long lastTxId = 0L; ! final long[] rootIds = new long[ RootBlockView.MAX_ROOT_ADDRS ]; IRootBlockView rootBlock0 = new RootBlockView(true, segmentId, ! nextOffset, firstTxId, lastTxId, commitCounter, rootIds); IRootBlockView rootBlock1 = new RootBlockView(false, segmentId, ! nextOffset, firstTxId, lastTxId, commitCounter, rootIds); FileChannel channel = raf.getChannel(); channel.write(rootBlock0.asReadOnlyBuffer(), OFFSET_ROOT_BLOCK0); --- 505,519 ---- nextOffset = 0; final long commitCounter = 0L; + final long commitTimestamp = 0L; final long firstTxId = 0L; final long lastTxId = 0L; ! final long commitRecordAddr = 0L; ! final long commitRecordIndexAddr = 0L; IRootBlockView rootBlock0 = new RootBlockView(true, segmentId, ! nextOffset, firstTxId, lastTxId, commitTimestamp, ! commitCounter, commitRecordAddr, commitRecordIndexAddr); IRootBlockView rootBlock1 = new RootBlockView(false, segmentId, ! nextOffset, firstTxId, lastTxId, commitTimestamp, ! commitCounter, commitRecordAddr, commitRecordIndexAddr); FileChannel channel = raf.getChannel(); channel.write(rootBlock0.asReadOnlyBuffer(), OFFSET_ROOT_BLOCK0); Index: TransactionServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TransactionServer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TransactionServer.java 8 Nov 2006 15:18:13 -0000 1.1 --- TransactionServer.java 17 Feb 2007 21:34:17 -0000 1.2 *************** *** 89,93 **** * target. Achieving higher concurrency will require more localized * mechanisms, e.g., single row atomic updates. There is also a use case ! * for read uncommitted transactions in order to permit earlier GC with * very very long running transactions, which would otherwise defer GC * until their completion. --- 89,93 ---- * target. Achieving higher concurrency will require more localized * mechanisms, e.g., single row atomic updates. There is also a use case ! * for read committed transactions in order to permit earlier GC with * very very long running transactions, which would otherwise defer GC * until their completion. *************** *** 514,515 **** --- 514,702 ---- } + + //* + //* FIXME Detect transaction identifiers that go backwards? For example tx0 + //* starts on one segment A while tx1 starts on segment B. Tx0 later starts + //* on segment B. From the perspective of segment B, tx0 begins after tx1. + //* This does not look like a problem unless there is an intevening commit, + //* at which point tx0 and tx1 will have starting contexts that differ by the + //* write set of the commit.<br> + //* What exactly is the impact when transactions start out of sequence? Do we + //* need to negotiated a distributed start time among all segments on which + //* the transaction starts? That would be a potential source of latency and + //* other kinds of pain. Look at how this is handled in the literature. One + //* way to handle it is essentially to declare the intention of the + //* transaction and pre-notify segments that will be written. This requires + //* some means of figuring out that intention and is probably relevant (and + //* solvable) only for very large row or key scans. + //* + //* @todo What exactly is the impact when transactions end out of sequence? I + //* presume that this is absolutely Ok. + + + // /** + // * <p> + // * Deallocate slots for versions having a transaction timestamp less than + // or + // * equal to <i>timestamp</i> that have since been overwritten (or deleted) + // * by a committed transaction having a timestamp greater than + // <i>timestamp</i>. + // * </p> + // * <p> + // * The criteria for deallocating historical versions is that (a) there is + // a + // * more recent version; and (b) there is no ACTIVE (vs PENDING or + // COMPLETED) + // * transaction which could read from that historical version. The journal + // * does NOT locally have enough information to decide when it can swept + // * historical versions written by a given transaction. This notice MUST + // come + // * from a transaction service which has global knowledge of which + // * transactions have PREPARED or ABORTED and can generate notices when all + // * transactions before a given timestamp have been PREPARED or ABORTED. + // For + // * example, a long running transaction can cause notice to be delayed for + // * many short lived transactions that have since completed. Once the long + // * running transaction completes, the transaction server can compute the + // * largest timestamp value below which there are no active transactions + // and + // * generate a single notice with that timestamp. + // * </p> + // * + // * @param timestamp + // * The timestamp. + // * + // * @todo This operation MUST be extremely efficient. + // * + // * @todo This method is exposed suposing a transaction service that will + // * deliver notice when the operation should be conducted based on + // * total knowledge of the state of all transactions running against + // * the distributed database. As such, it may have to scan the journal + // * to locate the commit record for transactions satisifying the + // * timestamp criteria. + // */ + // void gcTx( long timestamp ) { + + // // * <p> + // // * Note: Migration to the read-optimized database is NOT a + // pre-condition for + // // * deallocation of historical versions - rather it enables us to remove + // the + // // * <em>current</em> committed version from the journal. + // // * </p> + + // /* + // * FIXME Implement garbage collection of overwritten and unreachable + // * versions. Without migration to a read-optimized database, GC by + // * itself is NOT sufficient to allow us to deallocate versions that have + // * NOT been overwritten and hence is NOT sufficient to allow us to + // * discard historical transactions in their entirety. + // * + // * Given a transaction Tn that overwrites one or more pre-existing + // * versions, the criteria for deallocation of the overwritten versions + // * are: + // * + // * (A) Tn commits, hence its intention has been made persistent; and + // * + // * (B) There are no active transactions remaining that started from a + // * committed state before the commit state resulting from Tn, hence the + // * versions overwritten by Tn are not visible to any active transaction. + // * Any new transaction will read through the committed state produced by + // * Tn and will perceive the new versions rather than the overwritten + // * versions. + // * + // * Therefore, once Tn commits (assuming it has overwritten at least one + // * pre-existing version), we can add each concurrent transaction Ti that + // * is still active when Tn commits to a set of transactions that must + // * either validate or abort before we may GC(Tn). Since Tn has committed + // * it is not possible for new transactions to be created that would have + // * to be included in this set since any new transaction would start from + // * the committed state of Tn or its successors in the serialization + // * order. As transactions validate or abort they are removed from + // * GC(Tn). When this set is empty, we garbage collect the pre-existing + // * versions that were overwritten by Tn. + // * + // * The sets GC(T) must be restart safe. Changes to the set can only + // * occur when a transaction commits or aborts. However, even the abort + // * of a transaction MUST be noticable on restart. + // * + // * A summary may be used that is the highest transaction timestamp for + // * which Tn must wait before running GC(Tn). That can be written once + // * + // * + // * Note that multiple transactions may have committed, so we may find + // * that Tn has successors in the commit/serialization order that also + // * meet the above criteria. All such committed transactions may be + // * processed at once, but they MUST be processed in their commit order. + // * + // * Once those pre-conditions have been met the following algorithm is + // * applied to GC the historical versions that were overwritten by Tn: + // * + // * 1. For each write by Ti where n < i <= m that overwrote a historical + // * version, deallocate the slots for that historical version. This is + // * valid since there are no active transactions that can read from that + // * historical state. The processing order for Ti probably does not + // * matter, but in practice there may be a reason to choose the + // * serialization or reverse serialization order + // * + // * ---- this is getting closed but is not yet correct ---- + // * + // * All versions written by a given transaction have the timestamp of + // * that transaction. + // * + // * The committed transactions are linked by their commit records into a + // * reverse serialization sequence. + // * + // * Each committed transaction has an object index that is accessible + // * from its commit record. The entries in this index are versions that + // * were written (or deleted) by that transaction. This index reads + // * through into the object index for the committed state of the journal + // * from which the transaction was minted. + // * + // * We could maintain in the entry information about the historical + // * version that was overwritten. For example, its firstSlot or a run + // * length encoding of the slots allocated to the historical version. + // * + // * We could maintain an index for all overwritten versions from + // * [timestamp + dataId] to [slotId] (or a run-length encoding of the + // * slots on which the version was written). Given a timestamp, we would + // * then do a key scan from the start of the index for all entries whose + // * timestamp was less than or equal to the given timestamp. For each + // * such entry, we would deallocate the version and delete the entry from + // * the index. + // * + // * tx0 : begin tx0 : write id0 (v0) tx0 : commit journal : deallocate <= + // * tx0 (NOP since no overwritten versions) + // * + // * tx1 : begin tx2 : begin tx1 : write id0 (v1) tx1 : commit journal : + // * deallocate <= tx1 (MUST NOT BE GENERATED since dependencies exist : + // * tx1 and tx0 both depend on the committed state of tx0 -- sounds like + // * lock style dependencies for deallocation !) tx2 : commit journal : + // * deallocate <= tx2 + // * + // * index:: [ tx0 : id0 ] : v0 [ tx1 : id1 ] : v1 + // * + // * keyscan <= tx2 + // */ + + // } + + // /** + // * The transaction identifier of the last transaction begun on this + // journal. + // * In order to avoid extra IO this value survives restart IFF there is an + // * intervening commit by any active transaction. This value is used to + // * reject transactions whose identifier arrives out of sequence at the + // * journal. + // * + // * @return The transaction identifier or <code>-1</code> if no + // * transactions have begun on the journal (or if no transactions + // * have ever committed and no transaction has begun since restart). + // */ + // public long getLastBegunTx() { + // + // return lastBegunTx; + // + // } + // private long lastBegunTx = -1; + --- NEW FILE: LocalTimestampService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import com.bigdata.util.TimestampFactory; /** * A purely local implementation of an {@link ITimestampService} using a * {@link TimestampFactory} to assign distinct timestamps. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class LocalTimestampService implements ITimestampService { public static final transient ITimestampService INSTANCE = new LocalTimestampService(); public long nextTimestamp() { return TimestampFactory.nextNanoTime(); } } Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.28 retrieving revision 1.29 diff -C2 -d -r1.28 -r1.29 *** Tx.java 15 Feb 2007 22:01:18 -0000 1.28 --- Tx.java 17 Feb 2007 21:34:17 -0000 1.29 *************** *** 54,60 **** import com.bigdata.isolation.IsolatedBTree; import com.bigdata.isolation.UnisolatedBTree; - import com.bigdata.objndx.BTree; import com.bigdata.objndx.IIndex; import com.bigdata.objndx.IndexSegment; import com.bigdata.rawstore.IRawStore; import com.bigdata.scaleup.MetadataIndex; --- 54,60 ---- import com.bigdata.isolation.IsolatedBTree; import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.objndx.IIndex; import com.bigdata.objndx.IndexSegment; + import com.bigdata.objndx.ReadOnlyIndex; import com.bigdata.rawstore.IRawStore; import com.bigdata.scaleup.MetadataIndex; *************** *** 66,73 **** * named indices, and perform operations on those indices, and the operations * will be isolated according to the isolation level of the transaction. When ! * using an isolated transaction, writes are accumulated in an * {@link IsolatedBTree}. The write set is validated when the transaction * {@link #prepare()}s and finally merged down onto the global state when the ! * transaction commits. * </p> * --- 66,74 ---- * named indices, and perform operations on those indices, and the operations * will be isolated according to the isolation level of the transaction. When ! * using a writable isolated transaction, writes are accumulated in an * {@link IsolatedBTree}. The write set is validated when the transaction * {@link #prepare()}s and finally merged down onto the global state when the ! * transaction commits. When the transaction is read-only, writes will be ! * rejected and {@link #prepare()} and {@link #commit()} are NOPs. * </p> * *************** *** 75,83 **** * @version $Id$ * ! * @todo Transactions may be requested that are read-only for some historical ! * timestamp, that are read-committed (data committed by _other_ * transactions during the transaction will be visible within that ! * transaction), or that are fully isolated (changes made in other ! * transactions are not visible within the transaction). * * @todo The write set of a transaction is currently written onto an independent --- 76,95 ---- * @version $Id$ * ! * @todo Support read-committed transactions (data committed by _other_ * transactions during the transaction will be visible within that ! * transaction). Read-committed transactions do NOT permit writes (they ! * are read-only). Prepare and commit are NOPs. This might be a distinct ! * implementation sharing a common base class for handling the run state ! * stuff, or just a distinct implementation altogether. The read-committed ! * transaction is implemented by just reading against the named indices on ! * the journal. However, since commits or overflows of the journal might ! * invalidate the index objects we may have to setup a delegation ! * mechanism that resolves the named index either on each operation or ! * whenever the transaction receives notice that the index must be ! * discarded. In order to NOT see in-progress writes, the read-committed ! * transaction actually needs to dynamically resolve the most recent ! * {@link ICommitRecord} and then use the named indices resolved from ! * that. This suggests an event notification mechanism for commits so that ! * we can get the commit record more cheaply. * * @todo The write set of a transaction is currently written onto an independent *************** *** 105,109 **** * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by timestamp into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). --- 117,121 ---- * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by startTimestamp into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). *************** *** 138,144 **** /** ! * The timestamp assigned to this transaction. */ ! final protected long timestamp; /** --- 150,171 ---- /** ! * The start startTimestamp assigned to this transaction. ! * <p> ! * Note: Transaction {@link #startTimestamp} and {@link #commitTimestamp}s ! * are assigned by a global time service. The time service must provide ! * unique times for transaction start and commit timestamps and for commit ! * times for unisolated {@link Journal#commit()}s. */ ! final protected long startTimestamp; ! ! /** ! * The commit startTimestamp assigned to this transaction. ! */ ! private long commitTimestamp; ! ! /** ! * True iff the transaction is read only and will reject writes. ! */ ! final protected boolean readOnly; /** *************** *** 148,151 **** --- 175,185 ---- final protected long commitCounter; + /** + * The historical {@link ICommitRecord} choosen as the ground state for this + * transaction. All indices isolated by this transaction are isolated as of + * the discoverable root address based on this commit record. + */ + final private ICommitRecord commitRecord; + private RunState runState; *************** *** 165,232 **** /** ! * BTrees isolated by this transactions. */ ! private Map<String, IsolatedBTree> btrees = new HashMap<String, IsolatedBTree>(); /** ! * Return a named index. The index will be isolated at the same level as ! * this transaction. Changes on the index will be made restart-safe iff the ! * transaction successfully commits. ! * ! * @param name ! * The name of the index. ! * ! * @return The named index or <code>null</code> if no index is registered ! * under that name. */ ! public IIndex getIndex(String name) { ! ! if(name==null) throw new IllegalArgumentException(); ! ! /* ! * store the btrees in hash map so that we can recover the same instance ! * on each call within the same transaction. ! */ ! BTree btree = btrees.get(name); ! ! if(btree == null) { ! ! /* ! * see if the index was registered. ! * ! * FIXME this gets the last committed unisolated index. We need to ! * add a timestamp parameter and look up the appropriate metadata ! * record based on both the name and the timestamp (first metadata ! * record for the named index having a timestamp LT the transaction ! * timestamp). since this is always a request for historical and ! * read-only data, this method should not register a committer and ! * the returned btree should not participate in the commit protocol. ! */ ! UnisolatedBTree src = (UnisolatedBTree)journal.getIndex(name); ! ! // the named index was never registered. ! if(name==null) return null; ! ! /* ! * Isolate the named btree. ! */ ! return new IsolatedBTree(tmpStore,src); ! ! ! } ! ! return btree; ! } ! ! /** ! * This method must be invoked any time a transaction completes (aborts or ! * commits) in order to release the hard references to any named btrees ! * isolated within this transaction so that the JVM may reclaim the space ! * allocated to them on the heap. ! */ ! private void releaseBTrees() { ! ! btrees.clear(); } --- 199,212 ---- /** ! * Indices isolated by this transactions. */ ! private Map<String, IIndex> btrees = new HashMap<String, IIndex>(); /** ! * Create a fully isolated read-write transaction. */ ! public Tx(Journal journal,long timestamp) { ! this(journal,timestamp,false); } *************** *** 234,249 **** /** * Create a transaction starting the last committed state of the journal as ! * of the specified timestamp. * * @param journal * The journal. * ! * @param timestamp ! * The timestamp. * * @exception IllegalStateException * if the transaction state has been garbage collected. */ ! public Tx(Journal journal, long timestamp ) { if (journal == null) --- 214,238 ---- /** * Create a transaction starting the last committed state of the journal as ! * of the specified startTimestamp. * * @param journal * The journal. * ! * @param startTimestamp ! * The startTimestamp, which MUST be assigned consistently based on a ! * {@link ITimestampService}. Note that a transaction does not ! * start up on all {@link Journal}s at the same time. Instead, ! * the transaction start startTimestamp is assigned by a centralized ! * time service and then provided each time a transaction object ! * must be created for isolated on some {@link Journal}. ! * ! * @param readOnly ! * When true the transaction will reject writes and ! * {@link #prepare()} and {@link #commit()} will be NOPs. * * @exception IllegalStateException * if the transaction state has been garbage collected. */ ! public Tx(Journal journal, long timestamp, boolean readOnly) { if (journal == null) *************** *** 252,257 **** this.journal = journal; ! this.timestamp = timestamp; journal.activateTx(this); --- 241,248 ---- this.journal = journal; ! this.startTimestamp = timestamp; + this.readOnly = readOnly; + journal.activateTx(this); *************** *** 262,265 **** --- 253,263 ---- */ this.commitCounter = journal.getRootBlockView().getCommitCounter(); + + /* + * The commit record serving as the ground state for the indices + * isolated by this transaction (MAY be null, in which case the + * transaction will be unable to isolate any indices). + */ + this.commitRecord = journal.getCommitRecord(timestamp); this.runState = RunState.ACTIVE; *************** *** 268,278 **** /** ! * The transaction identifier (aka timestamp). * ! * @return The transaction identifier (aka timestamp). */ ! final public long getId() { ! return timestamp; } --- 266,311 ---- /** ! * The transaction identifier. * ! * @return The transaction identifier. ! * ! * @todo verify that this has the semantics of the transaction start time ! * and that the startTimestamp is (must be) assigned by the same service ! * that assigns the {@link #getCommitTimestamp()}. */ ! final public long getStartTimestamp() { ! return startTimestamp; ! ! } ! ! /** ! * Return the commit startTimestamp assigned to this transaction. ! * ! * @return The commit startTimestamp assigned to this transaction. ! * ! * @exception IllegalStateException ! * unless the transaction writable and {@link #isPrepared()} ! * or {@link #isCommitted()}. ! */ ! final public long getCommitTimestamp() { ! ! if(readOnly) { ! ! throw new IllegalStateException(); ! ! } ! ! switch(runState) { ! case ACTIVE: ! case ABORTED: ! throw new IllegalStateException(); ! case PREPARED: ! case COMMITTED: ! if(commitTimestamp == 0L) throw new AssertionError(); ! return commitTimestamp; ! } ! ! throw new AssertionError(); } *************** *** 280,284 **** public String toString() { ! return ""+timestamp; } --- 313,329 ---- public String toString() { ! return ""+startTimestamp; ! ! } ! ! /** ! * This method must be invoked any time a transaction completes (aborts or ! * commits) in order to release the hard references to any named btrees ! * isolated within this transaction so that the JVM may reclaim the space ! * allocated to them on the heap. ! */ ! private void releaseBTrees() { ! ! btrees.clear(); } *************** *** 300,328 **** } ! throw new IllegalStateException(NOT_ACTIVE); ! } ! try { /* ! * Validate against the current state of the journal's object index. */ - if( ! validate() ) { - abort(); ! ! throw new RuntimeException("Validation failed: write-write conflict"); ! } ! ! } catch( Throwable t ) { ! ! abort(); ! ! throw new RuntimeException("Unexpected error: "+t, t); ! } --- 345,392 ---- } ! throw new IllegalStateException(NOT_ACTIVE); ! } ! if (!readOnly) { /* ! * The commit startTimestamp is assigned when we prepare the transaction ! * since the the commit protocol does not permit unisolated writes ! * once a transaction begins to prepar until the transaction has ! * either committed or aborted (if such writes were allowed then we ! * would have to re-validate the transaction in order to enforce ! * serializability). ! * ! * @todo resolve this against a service in a manner that will ! * support a distributed database commit protocol. */ + commitTimestamp = journal.timestampFactory.nextTimestamp(); + + try { + + /* + * Validate against the current state of the journal's object + * index. + */ + + if (!validate()) { + + abort(); + + throw new RuntimeException( + "Validation failed: write-write conflict"); + + } + + } catch (Throwable t) { abort(); ! ! throw new RuntimeException("Unexpected error: " + t, t); ! } ! } *************** *** 349,353 **** * already complete, then it is aborted. */ ! public void commit() { if( ! isPrepared() ) { --- 413,417 ---- * already complete, then it is aborted. */ ! public long commit() { if( ! isPrepared() ) { *************** *** 363,387 **** } - /* - * Merge each isolated index into the global scope. This also marks the - * slots used by the versions written by the transaction as 'committed'. - * This operation MUST succeed (at a logical level) since we have - * already validated (neither read-write nor write-write conflicts - * exist). - * - * @todo Non-transactional operations on the global scope should be - * either disallowed entirely or locked out during the prepare-commit - * protocol when using transactions since they (a) could invalidate the - * pre-condition for the merge; and (b) uncommitted changes would be - * discarded if the merge operation fails. One solution is to use - * batch operations or group commit mechanism to dynamically create - * transactions from unisolated operations. - */ try { ! mergeOntoGlobalState(); ! // Atomic commit. ! journal.commit(this); runState = RunState.COMMITTED; --- 427,457 ---- } try { ! if(!readOnly) { ! ! /* ! * Merge each isolated index into the global scope. This also ! * marks the slots used by the versions written by the ! * transaction as 'committed'. This operation MUST succeed (at a ! * logical level) since we have already validated (neither ! * read-write nor write-write conflicts exist). ! * ! * @todo Non-transactional operations on the global scope should ! * be either disallowed entirely or locked out during the ! * prepare-commit protocol when using transactions since they ! * (a) could invalidate the pre-condition for the merge; and (b) ! * uncommitted changes would be discarded if the merge operation ! * fails. One solution is to use batch operations or group ! * commit mechanism to dynamically create transactions from ! * unisolated operations. ! */ ! ! mergeOntoGlobalState(); ! // Atomic commit. ! journal.commit(this); ! ! } runState = RunState.COMMITTED; *************** *** 421,424 **** --- 491,496 ---- } + + return getCommitTimestamp(); } *************** *** 449,452 **** --- 521,530 ---- } + final public boolean isReadOnly() { + + return ... [truncated message content] |
Update of /cvsroot/cweb/bigdata/src/test/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv12401/src/test/com/bigdata/journal Modified Files: TestTxRunState.java TestJournal.java TestRootBlockView.java AbstractTestCase.java TestTx.java TestAll.java TestJournalBasics.java Added Files: TestCommitRecordIndex.java TestReadCommittedTx.java TestCommitHistory.java TestConflictResolution.java TestConcurrentSchedules.java TestReadOnlyTx.java AbstractCommitRecordTestCase.java TestName2Addr.java TestCommitRecordSerializer.java TestTxJournalProtocol.java Log Message: Working through transactional isolation. --- NEW FILE: TestCommitHistory.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import java.nio.ByteBuffer; import java.util.Properties; /** * Test the ability to get (exact match) and find (most recent less than * or equal to) historical commit records in a {@link Journal}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestCommitHistory extends ProxyTestCase { /** * */ public TestCommitHistory() { } /** * @param name */ public TestCommitHistory(String name) { super(name); } /** * Compare two {@link ICommitRecord}s for equality in their data. * * @param expected * @param actual */ public void assertEquals(ICommitRecord expected, ICommitRecord actual) { if (expected == null) assertNull("Expected actual to be null", actual); else assertNotNull("Expected actual to be non-null", actual); assertEquals("timestamp", expected.getTimestamp(), actual.getTimestamp()); assertEquals("#roots", expected.getRootAddrCount(), actual.getRootAddrCount()); final int n = expected.getRootAddrCount(); for(int i=0; i<n; i++) { if(expected.getRootAddr(i) != actual.getRootAddr(i)) { assertEquals("rootAddr[" + i + "]", expected.getRootAddr(i), actual.getRootAddr(i)); } } } /** * Test that {@link Journal#getCommitRecord(long)} returns null if invoked * before anything has been committed. */ public void test_behaviorBeforeAnythingIsCommitted() { Journal journal = new Journal(getProperties()); assertNull(journal.getCommitRecord(journal.timestampFactory.nextTimestamp())); } /** * Test the ability to recover a {@link ICommitRecord} from the * {@link CommitRecordIndex}. */ public void test_recoverCommitRecord() { Journal journal = new Journal(getProperties()); /* * The first commit flushes the root leaves of some indices so we get * back a non-zero commit timestamp. */ assertTrue(0L!=journal.commit()); /* * A follow up commit in which nothing has been written should return a * 0L timestamp. */ assertEquals(0L,journal.commit()); journal.write(ByteBuffer.wrap(new byte[]{1,2,3})); final long commitTime1 = journal.commit(); assertTrue(commitTime1!=0L); ICommitRecord commitRecord = journal.getCommitRecord(commitTime1); assertNotNull(commitRecord); assertNotNull(journal.getCommitRecord()); assertEquals(commitTime1, journal.getCommitRecord().getTimestamp()); assertEquals(journal.getCommitRecord(),commitRecord); } /** * Tests whether the {@link CommitRecordIndex} is restart-safe. */ public void test_commitRecordIndex_restartSafe() { Properties properties = getProperties(); properties.setProperty(Options.DELETE_ON_CLOSE,"false"); Journal journal = new Journal(properties); if(!journal.isStable()) { // test only applies to restart-safe journals. return; } /* * Write a record directly on the store in order to force a commit to * write a commit record (if you write directly on the store it will not * cause a state change in the root addresses, but it will cause a new * commit record to be written with a new timestamp). */ // write some data. journal.write(ByteBuffer.wrap(new byte[]{1,2,3})); // commit the store. final long commitTime1 = journal.commit(); assertTrue(commitTime1!=0L); ICommitRecord commitRecord1 = journal.getCommitRecord(commitTime1); assertEquals(commitTime1, commitRecord1.getTimestamp()); assertEquals(commitTime1, journal.getRootBlockView().getCommitTimestamp()); /* * Close and then re-open the store and verify that the correct commit * record is returned. */ journal.close(); journal = new Journal(properties); ICommitRecord commitRecord2 = journal.getCommitRecord(); assertEquals(commitRecord1, commitRecord2); /* * Now recover the commit record by searching the commit record index. */ ICommitRecord commitRecord3 = journal.getCommitRecord(commitTime1); assertEquals(commitRecord1, commitRecord3); assertEquals(commitRecord2, commitRecord3); } /** * Tests for finding (less than or equal to) historical commit records using * the commit record index. This also tests restart-safety of the index with * multiple records (if the store is stable). */ public void test_commitRecordIndex_find() { Properties properties = getProperties(); properties.setProperty(Options.DELETE_ON_CLOSE,"false"); Journal journal = new Journal(properties); final int limit = 10; final long[] commitTime = new long[limit]; final long[] commitRecordIndexAddrs = new long[limit]; final ICommitRecord[] commitRecords = new ICommitRecord[limit]; for(int i=0; i<limit; i++) { // write some data. journal.write(ByteBuffer.wrap(new byte[]{1,2,3})); // commit the store. commitTime[i] = journal.commit(); assertTrue(commitTime[i]!=0L); if (i > 0) assertTrue(commitTime[i] > commitTime[i - 1]); commitRecordIndexAddrs[i] = journal.getRootBlockView().getCommitRecordIndexAddr(); assertTrue(commitRecordIndexAddrs[i]!=0L); if (i > 0) assertTrue(commitRecordIndexAddrs[i] > commitRecordIndexAddrs[i - 1]); // get the current commit record. commitRecords[i] = journal.getCommitRecord(); // test exact match on this timestamp. assertEquals(commitRecords[i],journal.getCommitRecord(commitTime[i])); if(i>0) { // test exact match on the prior timestamp. assertEquals(commitRecords[i-1],journal.getCommitRecord(commitTime[i-1])); } /* * Obtain a unique timestamp from the same source that the journal * is using to generate the commit timestamps. This ensures that * there will be at least one possible timestamp between each commit * timestamp. */ final long ts = journal.timestampFactory.nextTimestamp(); assertTrue(ts>commitTime[i]); } if (journal.isStable()) { /* * Close and then re-open the store so that we will also be testing * restart-safety of the commit record index. */ journal.close(); journal = new Journal(properties); } /* * Verify the historical commit records on exact match (get). */ { for( int i=0; i<limit; i++) { assertEquals(commitRecords[i], journal .getCommitRecord(commitTime[i])); } } /* * Verify access to historical records on LTE search (find). * * We ensured above that there is at least one possible timestamp value * between each pair of commit timestamps. We already verified that * timestamps that exactly match a known commit time return the * associated commit record. * * Now we verify that timestamps which proceed a known commit time but * follow after any earlier commit time, return the proceeding commit * record (finds the most recent commit record having a commit time less * than or equal to the probe time). */ { for( int i=1; i<limit; i++) { assertEquals(commitRecords[i - 1], journal .getCommitRecord(commitTime[i] - 1)); } /* * Verify a null return if we probe with a timestamp before any * commit time. */ assertNull(journal.getCommitRecord(commitTime[0] - 1)); } } } --- NEW FILE: TestConflictResolution.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 17, 2007 */ package com.bigdata.journal; /** * Tests of write-write conflict resolution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestConflictResolution extends ProxyTestCase { /** * */ public TestConflictResolution() { // TODO Auto-generated constructor stub } /** * @param name */ public TestConflictResolution(String name) { super(name); // TODO Auto-generated constructor stub } public void test_something() { fail("write tests"); } } Index: TestTxRunState.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestTxRunState.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TestTxRunState.java 13 Feb 2007 23:01:01 -0000 1.1 --- TestTxRunState.java 17 Feb 2007 21:34:12 -0000 1.2 *************** *** 52,55 **** --- 52,58 ---- /** + * Test suite for the state machine governing the transaction {@link RunState} + * transitions. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ *************** *** 91,95 **** ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getId()); assertTrue( tx0.isActive() ); --- 94,98 ---- ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getStartTimestamp()); assertTrue( tx0.isActive() ); *************** *** 140,144 **** ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getId()); assertTrue( tx0.isActive() ); --- 143,147 ---- ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getStartTimestamp()); assertTrue( tx0.isActive() ); *************** *** 200,204 **** ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getId()); assertTrue( tx0.isActive() ); --- 203,207 ---- ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getStartTimestamp()); assertTrue( tx0.isActive() ); *************** *** 262,266 **** ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getId()); assertTrue( tx0.isActive() ); --- 265,269 ---- ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getStartTimestamp()); assertTrue( tx0.isActive() ); *************** *** 330,334 **** ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getId()); assertTrue( tx0.isActive() ); --- 333,337 ---- ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getStartTimestamp()); assertTrue( tx0.isActive() ); *************** *** 398,402 **** ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getId()); assertTrue( tx0.isActive() ); --- 401,405 ---- ITx tx0 = new Tx(journal,ts0); ! assertEquals(ts0,tx0.getStartTimestamp()); assertTrue( tx0.isActive() ); Index: AbstractTestCase.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/AbstractTestCase.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** AbstractTestCase.java 9 Feb 2007 18:56:59 -0000 1.16 --- AbstractTestCase.java 17 Feb 2007 21:34:12 -0000 1.17 *************** *** 72,94 **** * </p> */ - abstract public class AbstractTestCase extends TestCase2 { - // /** - // * The name of the optional property whose boolean value indicates whether - // * or not {@link #dropStore()} should be invoked before each test - // * (default is "false"). - // */ - // public static final String dropBeforeTest = "dropBeforeTest"; - // - // /** - // * The name of the optional property whose boolean value indicates whether - // * or not {@link #dropStore()} should be invoked before each test - // * (default is "false" ). - // */ - // public static final String dropAfterTest = "dropAfterTest"; - // // Constructors. --- 72,79 ---- *************** *** 99,149 **** public AbstractTestCase(String name) {super(name);} - // // - // // Test suite. - // // - // - // /** - // * <p> - // * Appends to the <i>suite</i> the tests defined by this module. - // * </p> - // * - // * @param suite - // * A suite - may be empty. - // */ - // - // public static void addGenericSuite(ProxyTestSuite suite) { - // - // if (suite == null) { - // - // throw new IllegalArgumentException( - // "The ProxyTestSuite may not be null."); - // - // } - // - // /* - // * Create sub-suites (proxy suites) for each package defined in this - // * module. - // */ - //// ProxyTestSuite suite2 = new ProxyTestSuite(suite.getDelegate(),"org.CognitiveWeb.generic"); - //// ProxyTestSuite suite3 = new ProxyTestSuite(suite.getDelegate(),"org.CognitiveWeb.generic.gql"); - //// ProxyTestSuite suite4 = new ProxyTestSuite(suite.getDelegate(),"org.CognitiveWeb.generic.isomorph"); - // - // /* - // * Test suites for the core generic apis. - // */ - // - //// suite2.addTestSuite(IGenericProxyTestCase.class); - // - // /* - // * Combine into the parent test suite using a structure that mirrors the - // * package structure. - // */ - // - //// suite2.addTest(suite3); - //// suite2.addTest(suite4); - //// suite.addTest(suite2); - // - // } - //************************************************************ //************************************************************ --- 84,87 ---- *************** *** 153,161 **** * Invoked from {@link TestCase#setUp()} for each test in the suite. */ - // * This - // * method writes a test header into the log and is responsible for invoking - // * {@link #dropStore()} if the optional boolean property - // * {@link #dropBeforeTest} was specified and has the value "true". - public void setUp(ProxyTestCase testCase) throws Exception { --- 91,94 ---- *************** *** 163,177 **** + ":BEGIN:===================="); - // if (new Boolean(getProperties().getProperty(dropBeforeTest, - // "false")).booleanValue()) { - // - // try { - // dropStore(); - // } catch (Throwable ex) { - // log.error("Could not drop store.", ex); - // } - // - // } - } --- 96,99 ---- *************** *** 179,213 **** * Invoked from {@link TestCase#tearDown()} for each test in the suite. */ - // * This - // * method writes a test trailer into the log and is responsible for invoking - // * {@link #dropStore()} if the optional boolean property - // * {@link #dropAfterTest} was specified and has the value "true". - public void tearDown(ProxyTestCase testCase) throws Exception { - // if (isStoreOpen()) { - // - // log.warn("object manager not closed: test=" + testCase.getName() - // + ", closing now."); - // - // try { - // closeStore(); - // } catch (Throwable ex) { - // log.error("Could not close object manager.", ex); - // } - // - // } - // - // if (new Boolean(getProperties().getProperty(dropAfterTest, - // "false")).booleanValue()) { - // - // try { - // dropStore(); - // } catch (Throwable ex) { - // log.error("Could not drop store.", ex); - // } - // - // } - log.info("\n================:END:" + testCase.getName() + ":END:====================\n"); --- 101,106 ---- *************** *** 236,240 **** * @return A new properties object. */ - public Properties getProperties() { --- 129,132 ---- *************** *** 291,434 **** } - - // // - // // ObjectManager - // // - // - // private IObjectManager m_om; - // - // /** - // * <p> - // * Return the configured object manager. A new object manager will be - // * created if there is not one that is currently configured. - // * </p> - // * - // * @return The configured object manager. - // */ - // public IObjectManager getObjectManager() { - // checkIfProxy(); - // if( m_om == null ) { - // m_om = openStore(); - // } - // return m_om; - // } - // - // /** - // * <p> - // * Closes the current object manager and then opens a new object manager. - // * onto the configured store. - // * </p> - // * - // * @return A new object manager. - // */ - // public IObjectManager reopenStore() { - // checkIfProxy(); - // closeStore(); - // m_om = null; // make sure that this is cleared. - // m_om = openStore(); // make sure that this is re-assigned. - // return m_om; - // } - // - // /** - // * <p> - // * Opens and returns an object manager instance for the configured - // * persistence layer and store. The object manager is configured - // * using the properties returned by {@link #getProperties()}. - // * </p> - // * - // * @return A new object manager instance. - // * - // * @exception IllegalStateException - // * if there is a current object manager. - // */ - // protected IObjectManager openStore() { - // checkIfProxy(); - // if( m_om != null ) { - // throw new IllegalStateException("ObjectManager exists."); - // } - // m_om = ObjectManagerFactory.INSTANCE.newInstance(getProperties()); - // return m_om; - // } - // - // /** - // * <p> - // * Closes the current object manager. - // * </p> - // * - // * @exception IllegalStateException - // * if the object manager is not configured. - // */ - // protected void closeStore() { - // checkIfProxy(); - // if( m_om == null ) { - // throw new IllegalStateException("ObjectManager does not exist."); - // } - // m_om.close(); - // m_om = null; - // } - // - // /** - // * <p> - // * Return true iff the object manager exists. - // * </p> - // * - // * @return True if there is a configured object manager. - // */ - // protected boolean isStoreOpen() { - // checkIfProxy(); - // return m_om != null; - // } - // - // /** - // * <p> - // * Drop the configured database in use for the tests. This method may be - // * automatically invoked before and/or after each test by declaring the - // * appropriate property. - // * </p> - // * <p> - // * The semantics of "drop" depend on the GOM implementation and persistence - // * layer under test and can range from clearing a transient object manager, - // * to deleting the files corresponding to the store on disk, to dropping a - // * test database in a federation. As a rule, the object manager must be - // * closed when this method is invoked. - // * </p> - // * - // * @see #dropBeforeTest - // * @see #dropAfterTest - // */ - // abstract protected void dropStore(); - // - // // - // // Unique property name factory. - // // - // - // private Random r = new Random(); - // - // /** - // * <p> - // * A property name is derived from the test name plus a random integer to - // * avoid side effects. The store is NOT tested to verify that this property - // * name is unique, so it is possible that tests will occasionally fail - // * through rare collisions with existing property names. - // * </p> - // * <p> - // * Note: This method also gets used to generate unique association names and - // * object names. - // * </p> - // */ - // protected String getUniquePropertyName() { - // return getName()+"-"+r.nextInt(); - // } - // - // /** - // * Similar to {@link #getUniquePropertyName()} but embeds <i>name</i> in - // * the returned value. - // * - // * @see #getUniquePropertyName() - // */ - // protected String getUniquePropertyName(String name) { - // return getName()+"-"+name+"-"+r.nextInt(); - // } - //************************************************************ --- 183,186 ---- *************** *** 609,789 **** } - // /** - // * Test helper verifies that the data is deleted. - // */ - // public void assertDeleted(IStore store, int id) { - // - // try { - // - // store.read(id, null); - // - // fail("Expecting " + DataDeletedException.class); - // - // } catch (DataDeletedException ex) { - // - // System.err.println("Ignoring expected exception: " + ex); - // - // } - // - // } - - // /** - // * Test helper checks for the parameter for the semantics of "not found" as - // * defined by {@link IStore#read(int, ByteBuffer)}. - // * - // * @param actual - // * The value returned by either of those methods. - // */ - // public void assertNotFound(ByteBuffer actual) { - // - // assertNull("Expecting 'not found'", actual); - // - // } - - // /** - // * Test the version counter for a persistent identifier in the global scope. - // * - // * @param journal - // * The journal. - // * @param id - // * The int32 within segment persistent identifier. - // * @param expectedVersionCounter - // * The expected value of the version counter. - // * - // * @exception AssertionFailedError - // * if the persistent identifier is not found in the global - // * object index. - // * @exception AssertionFailedError - // * if the identifer is found, but the version counter value - // * differs from the expected version counter. - // */ - // protected void assertVersionCounter(Journal journal, int id, long expectedVersionCounter ) { - // - // // FIXME hardwired to SimpleObjectIndex. - // IObjectIndexEntry entry = ((SimpleObjectIndex)journal.objectIndex).objectIndex.get(id); - // - // if( entry == null ) fail("No entry in journal: id="+id); - // - // assertEquals("versionCounter", expectedVersionCounter, entry.getVersionCounter() ); - // - // } - // - // /** - // * Test the version counter for a persistent identifier in the transaction - // * scope. - // * - // * @param tx - // * The transaction. - // * @param id - // * The int32 within segment persistent identifier. - // * @param expectedVersionCounter - // * The expected value of the version counter. - // * @exception AssertionFailedError - // * if the persistent identifier is not found in the - // * transaction's outer object index (this test does NOT read - // * through to the inner index so you MUST NOT invoke it - // * before the version has been overwritten by the - // * transaction). - // * @exception AssertionFailedError - // * if the identifer is found, but the version counter value - // * differs from the expected version counter. - // */ - // protected void assertVersionCounter(Tx tx, int id, int expectedVersionCounter ) { - // - // // FIXME hardwired to SimpleObjectIndex. - // IObjectIndexEntry entry = ((SimpleObjectIndex)tx.getObjectIndex()).objectIndex.get(id); - // - //// IObjectIndexEntry entry = tx.getObjectIndex().objectIndex.get(id); - // - // if( entry == null ) fail("No entry in transaction: tx="+tx+", id="+id); - // - // assertEquals("versionCounter", (short) expectedVersionCounter, entry.getVersionCounter() ); - // - // } - - // /** - // * Write a data version consisting of N random bytes and verify that we can - // * read it back out again. - // * - // * @param store - // * The store. - // * @param id - // * The int32 within-segment persistent identifier. - // * @param nbytes - // * The data version length. - // * - // * @return The data written. This can be used to re-verify the write after - // * intervening reads. - // */ - // - // protected byte[] doWriteRoundTripTest(IStore store, int id, int nbytes) { - // - // System.err.println("Test writing: id="+id+", nbytes="+nbytes); - // - // byte[] expected = new byte[nbytes]; - // - // r.nextBytes(expected); - // - // ByteBuffer data = ByteBuffer.wrap(expected); - // - //// assertNull((tx == null ? journal.objectIndex.getSlots(id) - //// : tx.getObjectIndex().getSlots(id))); - // - // store.write(id,data); - // assertEquals("limit() != #bytes", expected.length, data.limit()); - // assertEquals("position() != limit()",data.limit(),data.position()); - // - //// ISlotAllocation slots = (tx == null ? journal.objectIndex.getSlots(id) - //// : tx.getObjectIndex().getSlots(id)); - //// assertEquals("#bytes",nbytes,slots.getByteCount()); - //// assertEquals("#slots",journal.slotMath.getSlotCount(nbytes),slots.getSlotCount()); - //// assertEquals(firstSlot,tx.objectIndex.getFirstSlot(id)); - // - // /* - // * Read into a buffer allocated by the Journal. - // */ - // ByteBuffer actual = store.read(id, null); - // - // assertEquals("acutal.position()",0,actual.position()); - // assertEquals("acutal.limit()",expected.length,actual.limit()); - // assertEquals("limit() - position() == #bytes",expected.length,actual.limit() - actual.position()); - // assertEquals(expected,actual); - // - // /* - // * Read multiple copies into a buffer that we allocate ourselves. - // */ - // final int ncopies = 7; - // int pos = 0; - // actual = ByteBuffer.allocate(expected.length * ncopies); - // for( int i=0; i<ncopies; i++ ) { - // - // /* - // * Setup to read into the next slice of our buffer. - // */ - //// System.err.println("reading @ i="+i+" of "+ncopies); - // pos = i * expected.length; - // actual.limit( actual.capacity() ); - // actual.position( pos ); - // - // ByteBuffer tmp = store.read(id, actual); - // assertTrue("Did not read into the provided buffer", tmp == actual); - // assertEquals("position()", pos, actual.position() ); - // assertEquals("limit() - position()", expected.length, actual.limit() - actual.position()); - // assertEquals(expected,actual); - // - // /* - // * Attempt to read with insufficient remaining bytes in the buffer - // * and verify that the data are read into a new buffer. - // */ - // actual.limit(pos+expected.length-1); - // tmp = store.read(id, actual); - // assertFalse("Read failed to allocate a new buffer", tmp == actual); - // assertEquals(expected,tmp); - // - // } - // - // return expected; - // - // } - } --- 361,363 ---- --- NEW FILE: TestCommitRecordSerializer.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 13, 2007 */ package com.bigdata.journal; import java.nio.ByteBuffer; import java.util.Random; import junit.framework.TestCase2; /** * Test suite for {@link CommitRecordSerializer}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestCommitRecordSerializer extends AbstractCommitRecordTestCase { /** * */ public TestCommitRecordSerializer() { } /** * @param arg0 */ public TestCommitRecordSerializer(String arg0) { super(arg0); } public void test_stress() { final int ntrials = 1000; for(int trial=0;trial<ntrials; trial++) { doRoundTripTest(getRandomCommitRecord()); } } public void doRoundTripTest(ICommitRecord roots) { CommitRecordSerializer ser = CommitRecordSerializer.INSTANCE; assertEquals(roots, ser.deserialize(ByteBuffer.wrap(ser .serialize(roots)))); } } --- NEW FILE: AbstractCommitRecordTestCase.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import java.util.Random; import junit.framework.TestCase; /** * Defines some helper methods for testing {@link ICommitRecord}s. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ abstract public class AbstractCommitRecordTestCase extends TestCase { public AbstractCommitRecordTestCase() { } public AbstractCommitRecordTestCase(String name) { super(name); } Random r = new Random(); /** * Compare two {@link ICommitRecord}s for equality in their data. * * @param expected * @param actual */ public void assertEquals(ICommitRecord expected, ICommitRecord actual) { assertEquals("timestamp", expected.getTimestamp(), actual.getTimestamp()); assertEquals("#roots", expected.getRootAddrCount(), actual.getRootAddrCount()); final int n = expected.getRootAddrCount(); for(int i=0; i<n; i++) { assertEquals("rootAddrs", expected.getRootAddr(i), actual.getRootAddr(i)); } } public ICommitRecord getRandomCommitRecord() { final long timestamp = System.currentTimeMillis(); final int n = ICommitRecord.MAX_ROOT_ADDRS; long[] roots = new long[n]; for(int i=0; i<n; i++) { boolean empty = r.nextInt(100)<30; roots[i] = empty ? 0L : r.nextInt(Integer.MAX_VALUE); } return new CommitRecord(timestamp,roots); } } Index: TestJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestJournal.java,v retrieving revision 1.25 retrieving revision 1.26 diff -C2 -d -r1.25 -r1.26 *** TestJournal.java 8 Feb 2007 21:32:09 -0000 1.25 --- TestJournal.java 17 Feb 2007 21:34:12 -0000 1.26 *************** *** 101,103 **** --- 101,109 ---- } + public void test_something() { + + fail("write tests"); + + } + } Index: TestJournalBasics.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** TestJournalBasics.java 13 Feb 2007 23:01:01 -0000 1.7 --- TestJournalBasics.java 17 Feb 2007 21:34:13 -0000 1.8 *************** *** 84,88 **** * @see ProxyTestSuite */ - public static Test suite() { --- 84,87 ---- *************** *** 90,102 **** TestSuite suite = new TestSuite("Core Journal Test Suite"); ! // @todo basic journal tests. ! // suite.addTestSuite( TestJournal.class ); ! // tests of creation, loolup, use, commit of named indices. suite.addTestSuite( TestNamedIndices.class ); ! // @todo transactional isolation tests. suite.addTestSuite( TestTxRunState.class ); suite.addTestSuite( TestTx.class ); return suite; --- 89,118 ---- TestSuite suite = new TestSuite("Core Journal Test Suite"); ! // @todo basic journal tests (none are defined yet). ! suite.addTestSuite( TestJournal.class ); ! // tests of creation, lookup, use, commit of named indices. suite.addTestSuite( TestNamedIndices.class ); ! // tests the ability to recover and find historical commit records. ! suite.addTestSuite( TestCommitHistory.class ); ! ! /* ! * tests of transaction support. ! */ ! // tests of transitions in the transaction RunState state machine. suite.addTestSuite( TestTxRunState.class ); + // @todo update these tests of the tx-journal integration. + suite.addTestSuite( TestTxJournalProtocol.class ); + // @todo tests of read-write transactions. suite.addTestSuite( TestTx.class ); + // @todo tests of read-only transactions. + suite.addTestSuite( TestReadOnlyTx.class ); + // @todo tests of read-committed transactions. + suite.addTestSuite( TestReadCommittedTx.class ); + // @todo tests of concurrent schedules and conflict detection. + suite.addTestSuite( TestConcurrentSchedules.class ); + // @todo tests of write-write conflict resolution. + suite.addTestSuite(TestConflictResolution.class); return suite; Index: TestTx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/test/com/bigdata/journal/TestTx.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** TestTx.java 13 Feb 2007 23:01:01 -0000 1.15 --- TestTx.java 17 Feb 2007 21:34:12 -0000 1.16 *************** *** 48,60 **** package com.bigdata.journal; - import java.io.IOException; import java.util.Properties; /** ! * Test suite for transaction isolation with respect to the underlying journal. ! * The tests in this suite are designed to verify isolation of changes within ! * the scope of the transaction when compared to the last committed state of the ! * journal. This basically amounts to verifying that operations read through the ! * transaction scope object index into the journal scope object index. * * @todo Do stress test with writes, reads, and deletes. --- 48,66 ---- package com.bigdata.journal; import java.util.Properties; + import com.bigdata.isolation.IsolatedBTree; + import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.objndx.IIndex; + /** ! * Test suite for fully-isolated read-write transactions. ! * ! * @todo Test suite for transaction isolation with respect to the underlying ! * journal. The tests in this suite are designed to verify isolation of ! * changes within the scope of the transaction when compared to the last ! * committed state of the journal. This basically amounts to verifying ! * that operations read through the transaction scope object index into ! * the journal scope object index. * * @todo Do stress test with writes, reads, and deletes. *************** *** 108,113 **** * transaction starts or - to enforce the data type specificity at the * risk of tighter integration of components - as part of the schema ! * declaration. Declare IConflictResolver that either merges ! * state into object in the transaction or causes the tx to abort. * * @todo Verify correct abort after 'prepare'. --- 114,119 ---- * transaction starts or - to enforce the data type specificity at the * risk of tighter integration of components - as part of the schema ! * declaration. Declare IConflictResolver that either merges state into ! * object in the transaction or causes the tx to abort. * * @todo Verify correct abort after 'prepare'. *************** *** 123,129 **** public class TestTx extends ProxyTestCase { - /** - * - */ public TestTx() { } --- 129,132 ---- *************** *** 134,227 **** /** ! * Test verifiers that duplicate transaction identifiers are detected in the ! * case where the first transaction is active. */ ! public void test_duplicateTransactionIdentifiers01() throws IOException { ! final Properties properties = getProperties(); ! try { ! Journal journal = new Journal(properties); ! Tx tx0 = new Tx(journal,0); ! try { ! // Try to create another transaction with the same identifier. ! new Tx(journal,0); ! ! fail( "Expecting: "+IllegalStateException.class); ! ! } ! catch( IllegalStateException ex ) { ! ! System.err.println("Ignoring expected exception: "+ex); ! ! } ! tx0.abort(); ! journal.close(); ! } finally { ! deleteTestJournalFile(); ! } } ! /** ! * Test verifiers that duplicate transaction identifiers are detected in the ! * case where the first transaction has already prepared. ! * ! * @todo The {@link Journal} does not maintain a collection of committed ! * transaction identifier for transactions that have already ! * committed. However, it might make sense to maintain a transient ! * collection that is rebuilt on restart of those transactions that ! * are waiting for GC. Also, it may be possible to summarily reject ! * transaction identifiers if they are before a timestamp when a ! * transaction service has notified the journal that no active ! * transactions remain before that timestamp. If those modifications ! * are made, then add the appropriate tests here. */ ! public void test_duplicateTransactionIdentifiers02() throws IOException { ! final Properties properties = getProperties(); ! try { ! Journal journal = new Journal(properties); ! ITx tx0 = new Tx(journal,0); ! tx0.prepare(); try { ! // Try to create another transaction with the same identifier. ! new Tx(journal,0); ! fail( "Expecting: "+IllegalStateException.class); - } - - catch( IllegalStateException ex ) { - System.err.println("Ignoring expected exception: "+ex); } - - tx0.abort(); ! journal.close(); ! ! } finally { ! ! deleteTestJournalFile(); ! ! } } --- 137,434 ---- /** ! * Test verifies that a transaction may start when there are (a) no commits ! * on the journal; and (b) no indices have been registered. ! * ! * @todo In the current implementation the transaction will be unable to ! * isolate an index if the index has not been registered already by an ! * unisolated transaction. */ ! public void test_noIndicesRegistered() { ! ! Properties properties = getProperties(); ! ! Journal journal = new Journal(properties); ! ! journal.commit(); ! ITx tx = journal.newTx(); ! ! /* ! * nothing written on this transaction. ! */ ! ! tx.prepare(); ! ! // commit. ! assertTrue(tx.commit()!=0L); ! journal.close(); ! ! } ! /** ! * Verify that an index is not visible in the tx until the native ! * transaction in which it is registered has already committed before ! * the tx starts. ! */ ! public void test_indexNotVisibleUnlessCommitted() { ! ! Properties properties = getProperties(); ! Journal journal = new Journal(properties); ! String name = "abc"; ! ! // register index in unisolated scope, but do not commit yet. ! journal.registerIndex(name, new UnisolatedBTree(journal, 3)); ! ! // start tx1. ! ITx tx1 = journal.newTx(); ! // the index is not visible in tx1. ! assertNull(tx1.getIndex(name)); ! ! // do unisolated commit. ! assertTrue(journal.commit()!=0L); ! ! // start tx2. ! ITx tx2 = journal.newTx(); ! ! // the index still is not visible in tx1. ! assertNull(tx1.getIndex(name)); ! ! // the index is visible in tx2. ! assertNotNull(tx2.getIndex(name)); ! ! tx1.abort(); ! ! tx2.abort(); ! ! journal.close(); ! ! } ! ! /** ! * Create a journal, setup an index, write an entry on that index, and ! * commit the store. Setup a transaction and verify that we can isolated ! * that index and read the written value. Write a value on the unisolated ! * index and verify that it is not visible within the transaction. ! */ ! public void test_readIsolation() { ! ! Properties properties = getProperties(); ! ! Journal journal = new Journal(properties); ! ! final String name = "abc"; ! ! final int branchingFactor = 3; ! ! final byte[] k1 = new byte[]{1}; ! final byte[] k2 = new byte[]{2}; ! ! final byte[] v1 = new byte[]{1}; ! final byte[] v2 = new byte[]{2}; ! ! { ! ! /* ! * register the index, write an entry on the unisolated index, ! * and commit the journal. ! */ ! IIndex index = journal.registerIndex(name, new UnisolatedBTree( ! journal, branchingFactor)); ! ! assertNull(index.insert(k1, v1)); ! assert(journal.commit()!=0L); ! } ! ! ITx tx1 = journal.newTx(); ! ! { ! /* ! * verify that the write is visible in a transaction that starts ! * after the commit. ! */ ! ! IIndex index = tx1.getIndex(name); ! ! assertTrue(index.contains(k1)); ! ! assertEquals(v1,(byte[])index.lookup(k1)); ! ! } ! { ! ! /* ! * obtain the unisolated index and write another entry and commit ! * the journal. ! */ ! ! IIndex index = journal.getIndex(name); ! ! assertNull(index.insert(k2, v2)); ! ! assertTrue(journal.commit()!=0L); ! ! } ! ! { ! ! /* ! * verify that the entry written on the unisolated index is not ! * visible to the transaction that started before that write. ! */ ! IIndex index = tx1.getIndex(name); ! ! assertTrue(index.contains(k1)); ! assertFalse(index.contains(k2)); ! ! } ! ! ITx tx2 = journal.newTx(); ! ! { ! ! /* ! * start another transaction and verify that the 2nd committed ! * write is now visible to that transaction. ! */ ! ! IIndex index = tx2.getIndex(name); ! ! assertTrue(index.contains(k1)); ! assertTrue(index.contains(k2)); ! ! } ! ! tx1.abort(); ! ! journal.close(); } ! /** ! * Test verifies that an isolated write is visible inside of a transaction ! * (tx1) but not in a concurrent transaction (tx2) and not in the unisolated ! * index until the tx1 commits. Once the tx1 commits, the write is visible ! * in the unisolated index. The write never becomes visible in tx2. If tx2 ! * attempts to write a value under the same key then a write-write conflict ! * is reported and validation fails. */ ! public void test_writeIsolation() { ! Properties properties = getProperties(); ! Journal journal = new Journal(properties); ! ! final String name = "abc"; ! ! final int branchingFactor = 3; ! ! final byte[] k1 = new byte[]{1}; ! final byte[] v1 = new byte[]{1}; ! final byte[] v1a = new byte[]{1,1}; ! ! { ! /* ! * register an index and commit the journal. ! */ ! ! journal.registerIndex(name, new UnisolatedBTree(journal, ! branchingFactor)); ! ! assert(journal.commit()!=0L); ! ! } ! /* ! * create two transactions. ! */ ! ! ITx tx1 = journal.newTx(); ! ! ITx tx2 = journal.newTx(); ! ! { ! ! /* ! * write an entry in tx1. verify that the entry is not visible ! * in the unisolated index or in the index as isolated by tx2. ! */ ! ! IsolatedBTree ndx1 = (IsolatedBTree)tx1.getIndex(name); ! ! assertFalse(ndx1.contains(k1)); + assertNull(ndx1.insert(k1,v1)); + + // check the version counter in tx1. + assertEquals("versionCounter", 0, ndx1.getValue(k1) + .getVersionCounter()); + + // not visible in the other tx. + assertFalse(tx2.getIndex(name).contains(k1)); + + // not visible in the unisolated index. + assertFalse(journal.getIndex(name).contains(k1)); + + /* + * commit tx1. verify that the write is still not visible in tx2 but + * that it is now visible in the unisolated index. + */ + + // prepare tx1. + tx1.prepare(); + + // commit tx1. + assertTrue(tx1.commit()!=0L); + + // still not visible in the other tx. + assertFalse(tx2.getIndex(name).contains(k1)); + + // but now visible in the unisolated index. + assertTrue(journal.getIndex(name).contains(k1)); + + // check the version counter in the unisolated index. + assertEquals("versionCounter", 1, ((UnisolatedBTree) journal + .getIndex(name)).getValue(k1).getVersionCounter()); + + /* + * write a conflicting entry in tx2 and verify that validation of + * tx2 fails. + */ + + assertNull(tx2.getIndex(name).insert(k1,v1a)); + + // check the version counter in tx2. + assertEquals("versionCounter", 0, ((IsolatedBTree) tx2 + .getIndex(name)).getValue(k1).getVersionCounter()); + try { ! tx2.prepare(); ! fail("Expecting: "+IllegalStateException.class); ! ! } catch(IllegalStateException ex) { System.err.println("Ignoring expected exception: "+ex); } ! assertTrue(tx2.isAborted()); ! ! } ! ! journal.close(); } --- NEW FILE: TestTxJournalProtocol.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 16, 2007 */ package com.bigdata.journal; import java.io.IOException; import java.util.Properties; /** * Test suite for the integration of the {@link Journal} and the {@link ITx} * implementations. * * @todo the tests in this suite are stale and need to be reviewed, possibly * revised or replaced, and certainly extended. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestTxJournalProtocol extends ProxyTestCase { public TestTxJournalProtocol() { } public TestTxJournalProtocol(String name) { super(name); } /** * Test verifies that duplicate transaction identifiers are detected in the * case where the first transaction is active. */ public void test_duplicateTransactionIdentifiers01() throws IOException { final Properties properties = getProperties(); try { Journal journal = new Journal(properties); Tx tx0 = new Tx(journal,0); try { // Try to create another transaction with the same identifier. new Tx(journal,0); fail( "Expecting: "+IllegalStateException.class); } catch( IllegalStateException ex ) { System.err.println("Ignoring expected exception: "+ex); } tx0.abort(); journal.close(); } finally { deleteTestJournalFile(); } } /** * Test verifies that duplicate transaction identifiers are detected in the * case where the first transaction has already prepared. * * @todo The {@link Journal} does not maintain a collection of committed * transaction identifier for transactions that have already * committed. However, it might make sense to maintain a transient * collection that is rebuilt on restart of those transactions that * are waiting for GC. Also, it may be possible to summarily reject * transaction identifiers if they are before a timestamp when a * transaction service has notified the journal that no active * transactions remain before that timestamp. If those modifications * are made, then add the appropriate tests here. */ public void test_duplicateTransactionIdentifiers02() throws IOException { final Properties properties = getProperties(); try { Journal journal = new Journal(properties); ITx tx0 = new Tx(journal,0); tx0.prepare(); try { // Try to create another transaction with the same identifier. new Tx(journal,0); fail( "Expecting: "+IllegalStateException.class); } catch( IllegalStateException ex ) { System.err.println("Ignoring expected exception: "+ex); ... [truncated message content] |
From: Bryan T. <tho...@us...> - 2007-02-17 21:34:30
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv12401/src/java/com/bigdata/isolation Modified Files: IsolatedBTree.java IValue.java UnisolatedBTree.java Log Message: Working through transactional isolation. Index: UnisolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/UnisolatedBTree.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** UnisolatedBTree.java 15 Feb 2007 22:01:18 -0000 1.4 --- UnisolatedBTree.java 17 Feb 2007 21:34:21 -0000 1.5 *************** *** 112,117 **** public class UnisolatedBTree extends BTree implements IIsolatableIndex { protected final IConflictResolver conflictResolver; ! /** * The delegate that handles write-write conflict resolution during backward --- 112,128 ---- public class UnisolatedBTree extends BTree implements IIsolatableIndex { + /** + * The optional conflict resolver. + */ protected final IConflictResolver conflictResolver; ! ! /** ! * True iff this is an instanceof {@link IIsolatedIndex}. This effects ! * which value we use for the version counter in ! * {@link #insert(Object, Object)} when there is no pre-existing entry for a ! * key. ! */ ! final boolean isIsolated = this instanceof IsolatedBTree; ! /** * The delegate that handles write-write conflict resolution during backward *************** *** 137,140 **** --- 148,163 ---- * @param store * @param branchingFactor + */ + public UnisolatedBTree(IRawStore store, int branchingFactor) { + + this(store,branchingFactor, null); + + } + + /** + * Create an isolated btree. + * + * @param store + * @param branchingFactor * @param conflictResolver * An optional object that handles write-write conflict *************** *** 203,206 **** --- 226,245 ---- /** + * This method breaks isolatation to return the {@link Value} for a key. It + * is used by {@link IsolatedBTree#validate(UnisolatedBTree)} to test + * version counters when a key already exists in the global scope. + * + * @todo make protected and refactor tests so that we do not need public + * access to this method. there should be tests in this package that + * examine the specific version counters that are assigned such that + * we do not need to expose this method as public. + */ + final public Value getValue(byte[] key) { + + return (Value) super.lookup(key); + + } + + /** * True iff the key does not exist or if it exists but is marked as * {@link IValue#isDeleted()}. *************** *** 216,222 **** throw new IllegalArgumentException(); ! Value value = (Value)super.lookup(key); ! ! if(value==null||value.deleted) return false; return true; --- 255,262 ---- throw new IllegalArgumentException(); ! Value value = (Value) super.lookup(key); ! ! if (value == null || value.deleted) ! return false; return true; *************** *** 241,247 **** throw new IllegalArgumentException(); ! Value value = (Value)super.lookup(key); ! if(value==null||value.deleted) return null; return value.datum; --- 281,288 ---- throw new IllegalArgumentException(); ! Value value = (Value) super.lookup(key); ! if (value == null || value.deleted) ! return null; return value.datum; *************** *** 276,283 **** /** ! * If the key does not exists or if the key exists, then insert/update an ! * entry under that key with a new version counter. Otherwise, update the ! * entry under that key in order to increment the version counter (this ! * includes the case where the key is paired with a deletion marker). * * @param key --- 317,325 ---- /** ! * If the key does not exists or if the key exists but the entry is marked ! * as deleted, then insert/update an entry under that key with a new version ! * counter. Otherwise, update the entry under that key in order to increment ! * the version counter (this includes the case where the key is paired with ! * a deletion marker). * * @param key *************** *** 298,303 **** if (value == null) { ! super.insert(key, new Value(IValue.FIRST_VERSION_UNISOLATED, false, ! (byte[]) val)); return null; --- 340,351 ---- if (value == null) { ! /* ! * No entry exists for that key (not even a deleted entry). ! */ ! ! short versionCounter = isIsolated ? IValue.FIRST_VERSION_ISOLATED ! : IValue.FIRST_VERSION_UNISOLATED; ! ! super.insert(key, new Value(versionCounter, false, (byte[]) val)); return null; *************** *** 305,308 **** --- 353,362 ---- } + /* + * An entry exists for that key (the entry may be marked as deleted, but + * that does not effect the behavior of insert). We assign the next + * version counter to the entry, clear the deleted flag, and set the new + * value on the entry. + */ super.insert(key, new Value(value.nextVersionCounter(), false, (byte[]) val)); Index: IValue.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/IValue.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IValue.java 13 Feb 2007 23:01:02 -0000 1.1 --- IValue.java 17 Feb 2007 21:34:21 -0000 1.2 *************** *** 50,55 **** * and version counters. * <p> ! * Deletion markers are required by both transactions (so that the entry may be ! * removed from the unisolated tree when the transaction commits) and * partitioned indices (so that a key deleted in an unisolated index may be * removed during a compacting merge with existing index segments). --- 50,55 ---- * and version counters. * <p> ! * Deletion markers are required both by transactions (so that the entry may be ! * removed from the unisolated tree when the transaction commits) and by * partitioned indices (so that a key deleted in an unisolated index may be * removed during a compacting merge with existing index segments). *************** *** 59,72 **** * tree. When a transaction isolates an index it does so by creating a fused * view that reads from a historical committed state of the corresponding ! * unisolated index and writes on an isolated index. When a value is written on ! * in the isolated index, the version counter from the unisolated index is ! * copied into the isolated index. When the transaction commits, it validates ! * the writes by testing the version counter in the then current unisolated ! * index. If the version counter for an entry in the isolated index does not ! * agree with the version counter on the unisolated index then an intervening ! * commit has already overwritten that entry and a write-write conflict exists. ! * Either the write-write conflict can be resolved or the transaction must ! * abort. In the special case where there are no intervening commits since the ! * transaction began validation is unecessary and should be skipped. * <p> * A delete is handled as a write that sets a "deleted" flag. Eventually the --- 59,73 ---- * tree. When a transaction isolates an index it does so by creating a fused * view that reads from a historical committed state of the corresponding ! * unisolated index and writes on an isolated index. The first time a value is ! * written on the isolated index for which there is a pre-existing value in the ! * unisolated index, the version counter from the unisolated index is copied ! * into the isolated index. When the transaction commits, it validates writes by ! * testing the version counter in the <em>then current</em> unisolated index. ! * If the version counter for an entry in the isolated index does not agree with ! * the version counter on the unisolated index then an intervening commit has ! * already overwritten that entry and a write-write conflict exists. Either the ! * write-write conflict can be resolved or the transaction must abort. In the ! * special case where there are no intervening commits since the transaction ! * began validation is unecessary and should be skipped. * <p> * A delete is handled as a write that sets a "deleted" flag. Eventually the *************** *** 90,93 **** --- 91,118 ---- * The value of the version counter that is used the first time a data * version is written for a key in an unisolated index: ONE (1). + * <p> + * Note: The first version counter written on the unisolated index (1) is + * larger than the {@link #FIRST_VERSION_ISOLATED first version counter} + * written on an isolated index (0). The reason is that this approach + * essentially presumes a version counter of zero (0) if a key does not + * exist in the unisolated index, which is then copied into the isolated + * index as a zero(0) version counter. + * <p> + * If the transaction validates and the isolated write is copied onto the + * unisolated index then the version counter is handled in one of the + * following ways. + * <ol> + * <li> If there is still no key for the entry, then the + * {@link #FIRST_VERSION_UNISOLATED} version counter (1) is used and we have + * effectively incremented from a zero (0) version counter for a + * non-existing key to an one (1) version counter on the first write.</li> + * <li> If another transaction has committed, then the version counter in + * the global scope will now be GTE one (1). This will cause a write-write + * conflict to be detected during validation. If the write-write conflict is + * resolved, then the copy down onto the global scope will assign the next + * version counter, e.g., the new version counter will be two (2) (if there + * was only one intervening commit) or greater (if there was more than one + * intervening commit).</li> + * </ol> */ public static short FIRST_VERSION_UNISOLATED = (short) 1; Index: IsolatedBTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/isolation/IsolatedBTree.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** IsolatedBTree.java 15 Feb 2007 22:01:18 -0000 1.3 --- IsolatedBTree.java 17 Feb 2007 21:34:21 -0000 1.4 *************** *** 135,147 **** * Typically, it is the unisolated btree from the last committed * state on the {@link Journal} before the {@link Tx} starts. - * - * FIXME The isolated btree needs to start from the committed stable state - * of another btree (a possible exception is the first transaction to create - * a given btree). In order to verify that the source btree meets those - * requirements we need to know that it was loaded from a historical - * metadata record, e.g., as found in a root block or a read-only root names - * index found in a root block. Merely having a persistent root is NOT - * enough since just writing the tree onto the store does not make it - * restart safe. The {@link Tx} class needs to handle this. */ public IsolatedBTree(IRawStore store, UnisolatedBTree src) { --- 135,138 ---- *************** *** 170,173 **** --- 161,169 ---- * @param metadata * @param src + * + * @todo This constructor is somewhat different since it requires access to + * a persistence capable parameter in order to reconstruct the view. + * Consider whether or not this can be refactored per + * {@link BTreeMetadata#load(IRawStore, long)}. */ public IsolatedBTree(IRawStore store, BTreeMetadata metadata, UnisolatedBTree src) { *************** *** 254,261 **** * write through to the isolated index). */ ! public Object insert(Object key,Object value) { ! ! return super.insert(key,value); ! } --- 250,255 ---- * write through to the isolated index). */ ! public Object insert(Object key, Object val) { ! return super.insert(key,val); } *************** *** 405,413 **** * scanning both indices in order. * ! * Note: we must use the implementation of this method on the super ! * class in order to visit the IValue objects and see both deleted and ! * undeleted entries. */ ! final IEntryIterator itr = super.entryIterator(); while (itr.hasNext()) { --- 399,406 ---- * scanning both indices in order. * ! * Note: the iterator is chosen carefully in order to visit the IValue ! * objects and see both deleted and undeleted entries. */ ! final IEntryIterator itr = root.rangeIterator(null, null, null); while (itr.hasNext()) { *************** *** 420,424 **** // Lookup the entry in the global scope. ! Value baseEntry = (Value) globalScope.lookup(key); /* --- 413,417 ---- // Lookup the entry in the global scope. ! Value baseEntry = (Value) globalScope.getValue(key); /* *************** *** 488,492 **** } ! if(tmp!=null) { /* --- 481,485 ---- } ! if (tmp != null) { /* *************** *** 526,534 **** /* ! * Note: we must use the implementation of this method on the super ! * class in order to visit the IValue objects and see both deleted ! * and undeleted entries. */ ! final IEntryIterator itr = super.entryIterator(); while (itr.hasNext()) { --- 519,526 ---- /* ! * Note: the iterator is chosen carefully in order to visit the IValue ! * objects and see both deleted and undeleted entries. */ ! final IEntryIterator itr = root.rangeIterator(null, null, null); while (itr.hasNext()) { *************** *** 538,542 **** // The corresponding key. ! final byte[] id = (byte[]) itr.getKey(); if (entry.deleted) { --- 530,534 ---- // The corresponding key. ! final byte[] key = (byte[]) itr.getKey(); if (entry.deleted) { *************** *** 549,559 **** */ ! if (globalScope.contains(id)) { /* * Mark the entry in the unisolated index as deleted. */ ! globalScope.insert(id, new Value( ! entry.nextVersionCounter(), true, null)); } else { --- 541,552 ---- */ ! if (globalScope.contains(key)) { /* * Mark the entry in the unisolated index as deleted. */ ! // globalScope.insert(key, new Value( ! // entry.nextVersionCounter(), true, null)); ! globalScope.remove(key); } else { *************** *** 571,576 **** * Copy the entry down onto the global scope. */ ! globalScope.insert(id, new Value(entry.nextVersionCounter(), ! false, entry.datum)); } --- 564,570 ---- * Copy the entry down onto the global scope. */ ! // globalScope.insert(key, new Value(entry.nextVersionCounter(), ! // false, entry.datum)); ! globalScope.insert(key,entry.datum); } |