Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5461/src/java/com/bigdata/journal Modified Files: TransientBufferStrategy.java AbstractBufferStrategy.java TemporaryStore.java Name2Addr.java IJournal.java IBufferStrategy.java CommitRecordIndex.java ICommitRecord.java DiskOnlyStrategy.java ForceEnum.java CommitRecord.java Tx.java ITx.java BasicBufferStrategy.java MappedBufferStrategy.java BlockWriteCache.java Options.java DirectBufferStrategy.java FileMetadata.java Journal.java CommitRecordSerializer.java Added Files: IMROW.java TemporaryRawStore.java Removed Files: DataDeletedException.java PageCommitList.java Log Message: Further work supporting transactional isolation. Index: ForceEnum.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ForceEnum.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ForceEnum.java 31 Oct 2006 17:11:58 -0000 1.1 --- ForceEnum.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 65,69 **** * The file is not forced to stable storage. */ ! No("no"), /** --- 65,69 ---- * The file is not forced to stable storage. */ ! No("No"), /** *************** *** 72,76 **** * @see FileChannel#force(boolean) */ ! Force("force"), /** --- 72,76 ---- * @see FileChannel#force(boolean) */ ! Force("Force"), /** *************** *** 79,83 **** * @see FileChannel#force(boolean) */ ! ForceMetadata("metadata"); private String name; --- 79,83 ---- * @see FileChannel#force(boolean) */ ! ForceMetadata("ForceMetadata"); private String name; *************** *** 94,98 **** /** ! * Parse a string whose contents must be "no", "force", "forceMetadata". * * @param s --- 94,98 ---- /** ! * Parse a string whose contents must be "No", "Force", "ForceMetadata". * * @param s Index: Name2Addr.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Name2Addr.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Name2Addr.java 17 Feb 2007 21:34:17 -0000 1.3 --- Name2Addr.java 21 Feb 2007 20:17:21 -0000 1.4 *************** *** 43,46 **** --- 43,49 ---- * the use of the {@link #keyBuilder} and therefore minimizes the relatively * expensive operation of encoding unicode names to byte[] keys. + * + * @todo use a weak value cache so that unused indices may be swept by the + * GC. */ private Map<String,IIndex> name2BTree = new HashMap<String,IIndex>(); *************** *** 144,149 **** } ! /* re-load btree from the store. ! */ btree = BTreeMetadata.load(this.store, entry.addr); --- 147,151 ---- } ! // re-load btree from the store. btree = BTreeMetadata.load(this.store, entry.addr); Index: MappedBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/MappedBufferStrategy.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** MappedBufferStrategy.java 8 Feb 2007 21:32:10 -0000 1.8 --- MappedBufferStrategy.java 21 Feb 2007 20:17:21 -0000 1.9 *************** *** 5,8 **** --- 5,9 ---- import com.bigdata.rawstore.Addr; + import com.bigdata.scaleup.PartitionedJournal; /** *************** *** 26,29 **** --- 27,38 ---- * @see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 * @see BufferMode#Mapped + * + * @todo since the mapped file can not be extended or truncated, use of mapped + * files pretty much suggests that we pre-extend to the maximum allowable + * extent. it something can not be committed due to overflow, then a tx + * could be re-committed after a rollover of a {@link PartitionedJournal}. + * Note that the use of mapped files might not prove worth the candle due + * to the difficulties with resource deallocation for this strategy and + * the good performance of some alternative strategies. */ public class MappedBufferStrategy extends DiskBackedBufferStrategy { *************** *** 39,42 **** --- 48,57 ---- */ final MappedByteBuffer mappedBuffer; + + public boolean isFullyBuffered() { + + return false; + + } MappedBufferStrategy(long maximumExtent, FileMetadata fileMetadata) { --- NEW FILE: IMROW.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 20, 2007 */ package com.bigdata.journal; import com.bigdata.rawstore.IRawStore; /** * A marker interface for a store that supports Multiple Readers, One Writer. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IMROW extends IRawStore { } --- DataDeletedException.java DELETED --- Index: CommitRecordSerializer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/CommitRecordSerializer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** CommitRecordSerializer.java 17 Feb 2007 21:34:18 -0000 1.1 --- CommitRecordSerializer.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 73,76 **** --- 73,78 ---- final long timestamp = commitRecord.getTimestamp(); + final long commitCounter = commitRecord.getCommitCounter(); + final int n = commitRecord.getRootAddrCount(); *************** *** 85,88 **** --- 87,92 ---- dos.writeLong(timestamp); + + dos.writeLong(commitCounter); LongPacker.packLong(dos, n); *************** *** 119,122 **** --- 123,128 ---- final long timestamp = dis.readLong(); + final long commitCounter = dis.readLong(); + final int n = (int)LongPacker.unpackLong(dis); *************** *** 129,133 **** } ! return new CommitRecord(timestamp,roots); } catch (IOException ex) { --- 135,139 ---- } ! return new CommitRecord(timestamp,commitCounter,roots); } catch (IOException ex) { Index: ICommitRecord.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ICommitRecord.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ICommitRecord.java 17 Feb 2007 21:34:17 -0000 1.1 --- ICommitRecord.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 86,90 **** */ public long getTimestamp(); ! /** * The #of allowed root addresses. --- 86,97 ---- */ public long getTimestamp(); ! ! /** ! * The commit counter associated with the commit record. This is used by ! * transactions in order to determine whether or not intervening commits ! * have occurred since the transaction start time. ! */ ! public long getCommitCounter(); ! /** * The #of allowed root addresses. Index: ITx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/ITx.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** ITx.java 19 Feb 2007 01:05:39 -0000 1.3 --- ITx.java 21 Feb 2007 20:17:21 -0000 1.4 *************** *** 67,71 **** /** ! * Prepare the transaction for a {@link #commit()}. * * @exception IllegalStateException --- 67,71 ---- /** ! * Validate the write set for the transaction. * * @exception IllegalStateException Index: AbstractBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** AbstractBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.10 --- AbstractBufferStrategy.java 21 Feb 2007 20:17:21 -0000 1.11 *************** *** 5,9 **** import java.nio.ByteBuffer; import java.nio.channels.FileChannel; - import java.text.Format; import java.text.NumberFormat; --- 5,8 ---- Index: Journal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Journal.java,v retrieving revision 1.54 retrieving revision 1.55 diff -C2 -d -r1.54 -r1.55 *** Journal.java 20 Feb 2007 00:27:03 -0000 1.54 --- Journal.java 21 Feb 2007 20:17:21 -0000 1.55 *************** *** 50,57 **** import java.io.File; import java.nio.ByteBuffer; - import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; --- 50,57 ---- import java.io.File; import java.nio.ByteBuffer; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; *************** *** 66,69 **** --- 66,70 ---- import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; + import com.bigdata.util.concurrent.DaemonThreadFactory; /** *************** *** 135,139 **** * writes and deletes on the journal). (However, note transaction processing MAY * be concurrent since the write set of a transaction is written on a ! * {@link TemporaryStore}.) * </p> * --- 136,140 ---- * writes and deletes on the journal). (However, note transaction processing MAY * be concurrent since the write set of a transaction is written on a ! * {@link TemporaryRawStore}.) * </p> * *************** *** 195,199 **** * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal, ITransactionManager { /** --- 196,200 ---- * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal { /** *************** *** 752,755 **** --- 753,762 ---- } + public boolean isFullyBuffered() { + + return _bufferStrategy.isFullyBuffered(); + + } + /** * Return a read-only view of the current root block. *************** *** 840,844 **** public void abort() { ! // clear the root addresses - the will be reloaded. _commitRecord = null; --- 847,851 ---- public void abort() { ! // clear the root addresses - they will be reloaded. _commitRecord = null; *************** *** 932,937 **** */ final ICommitRecord commitRecord = new CommitRecord(commitTimestamp, ! rootAddrs); final long commitRecordAddr = write(ByteBuffer --- 939,948 ---- */ + final IRootBlockView old = _rootBlock; + + final long newCommitCounter = old.getCommitCounter() + 1; + final ICommitRecord commitRecord = new CommitRecord(commitTimestamp, ! newCommitCounter, rootAddrs); final long commitRecordAddr = write(ByteBuffer *************** *** 978,983 **** { - final IRootBlockView old = _rootBlock; - /* * Update the firstTxId the first time a transaction commits and the --- 989,992 ---- *************** *** 1011,1015 **** !old.isRootBlock0(), old.getSegmentId(), _bufferStrategy .getNextOffset(), firstTxId, lastTxId, ! commitTimestamp, old.getCommitCounter() + 1, commitRecordAddr, commitRecordIndexAddr); --- 1020,1024 ---- !old.isRootBlock0(), old.getSegmentId(), _bufferStrategy .getNextOffset(), firstTxId, lastTxId, ! commitTimestamp, newCommitCounter, commitRecordAddr, commitRecordIndexAddr); *************** *** 1068,1076 **** } ! public ByteBuffer read(long addr, ByteBuffer dst) { assertOpen(); ! return _bufferStrategy.read(addr, dst); } --- 1077,1085 ---- } ! public ByteBuffer read(long addr) { assertOpen(); ! return _bufferStrategy.read(addr); } *************** *** 1105,1115 **** if (commitRecordAddr == 0L) { ! _commitRecord = new CommitRecord(0L); } else { _commitRecord = CommitRecordSerializer.INSTANCE ! .deserialize(_bufferStrategy.read(commitRecordAddr, ! null)); } --- 1114,1123 ---- if (commitRecordAddr == 0L) { ! _commitRecord = new CommitRecord(); } else { _commitRecord = CommitRecordSerializer.INSTANCE ! .deserialize(_bufferStrategy.read(commitRecordAddr)); } *************** *** 1287,1295 **** * that satisify the probe. * ! * @todo this implies that timestamps for all commits are generated by a ! * global timestamp service. */ public ICommitRecord getCommitRecord(long timestamp) { ! return _commitRecordIndex.find(timestamp); --- 1295,1306 ---- * that satisify the probe. * ! * @todo the {@link CommitRecordIndex} is a possible source of thread ! * contention since transactions need to use this code path in order ! * to locate named indices but the {@link #commitService} can also ! * write on this index. I have tried some different approaches to ! * handling this. */ public ICommitRecord getCommitRecord(long timestamp) { ! return _commitRecordIndex.find(timestamp); *************** *** 1331,1344 **** * prepared or aborted. */ ! final Map<Long, ITx> activeTx = new HashMap<Long, ITx>(); /** * A hash map containing all transactions that have prepared but not yet ! * either committed or aborted. ! * ! * @todo this is probably useless. A transaction will be in this map only ! * while it is actively committing. */ ! final Map<Long, ITx> preparedTx = new HashMap<Long, ITx>(); /** --- 1342,1354 ---- * prepared or aborted. */ ! final Map<Long, ITx> activeTx = new ConcurrentHashMap<Long, ITx>(); /** * A hash map containing all transactions that have prepared but not yet ! * either committed or aborted. A transaction will be in this map only while ! * it is actively committing, so this is always a "map" of one and could be ! * replaced by a scalar reference. */ ! final Map<Long, ITx> preparedTx = new ConcurrentHashMap<Long, ITx>(); /** *************** *** 1349,1354 **** */ final ExecutorService commitService = Executors ! .newSingleThreadExecutor(Executors.defaultThreadFactory()); ! public long newTx() { --- 1359,1365 ---- */ final ExecutorService commitService = Executors ! .newSingleThreadExecutor(DaemonThreadFactory ! .defaultThreadFactory()); ! public long newTx() { *************** *** 1359,1362 **** --- 1370,1376 ---- public long newTx(boolean readOnly) { + // System.err.println("#active=" + activeTx.size() + ", #committed=" + // + _rootBlock.getCommitCounter()); + return new Tx(this, timestampFactory.nextTimestamp(), readOnly) .getStartTimestamp(); *************** *** 1389,1396 **** throw new IllegalArgumentException("No such tx: " + ts); ! tx.abort(); } public long commit(long ts) { --- 1403,1427 ---- throw new IllegalArgumentException("No such tx: " + ts); ! if(tx.isReadOnly()) return; ! ! // queue the abort request. ! commitService.submit(new AbortTask(tx)); } + /** + * @todo Implement group commit. large transactions do not get placed into + * commit groups. Group commit collects up to N 'small' transactions + * with at most M milliseconds of latency and perform a single atomic + * commit for that group of transactions, forcing the data to disk and + * then returning control to the clients. Note that we need to collect + * transactions once they have completed but not yet validated. Any + * transaction in the group that fails validation must be aborted. + * Validation needs to be against then current state of the database, + * so the process is serialized (validate+commit) with transactions + * that fail validation being placed onto an abort queue. A single + * unisolated commit is then performed for the group, so there is one + * SYNC per commit. + */ public long commit(long ts) { *************** *** 1400,1403 **** --- 1431,1444 ---- throw new IllegalArgumentException("No such tx: " + ts); + /* + * A read-only transaction can commit immediately since validation and + * commit are basically NOPs. + * + * @todo We could also detect transactions with empty write sets (no + * touched indices) and shortcut the prepare/commit for those + * transactions as well. The easy way to do this is to rangeCount each + * index isolated by the transaction. + */ + if(tx.isReadOnly()) { *************** *** 1472,1475 **** --- 1513,1549 ---- /** + * Task aborts a transaction when it is run by the + * {@link Journal#commitService}. This is used to serialize some abort + * processing which would otherwise be concurrent with commit processing. + * {@link Journal#abort()} makes some assumptions that it is a + * single-threaded environment and those assumptions would be violated + * without serialization aborts and commits on the same queue. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ + private static class AbortTask implements Callable<Long> { + + private final ITx tx; + + public AbortTask(ITx tx) { + + assert tx != null; + + this.tx = tx; + + } + + public Long call() throws Exception { + + tx.abort(); + + return 0L; + + } + + } + + /** * Notify the journal that a new transaction is being activated (starting on * the journal). *************** *** 1556,1560 **** * Lookup an active or prepared transaction (exact match). * ! * @param startTimestamp * The start timestamp for the transaction. * --- 1630,1634 ---- * Lookup an active or prepared transaction (exact match). * ! * @param startTime * The start timestamp for the transaction. * *************** *** 1563,1573 **** * transaction. */ ! public ITx getTx(long startTimestamp) { ! ITx tx = activeTx.get(startTimestamp); if (tx == null) { ! tx = preparedTx.get(startTimestamp); } --- 1637,1647 ---- * transaction. */ ! public ITx getTx(long startTime) { ! ITx tx = activeTx.get(startTime); if (tx == null) { ! tx = preparedTx.get(startTime); } --- NEW FILE: TemporaryRawStore.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 15, 2007 */ package com.bigdata.journal; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; /** * A non-restart-safe store for temporary data that buffers data in memory until * a maximum capacity has been reached and then converts to a disk-based store * with a maximum capacity of 2G. The maximum capacity constraint is imposed by * {@link Addr}. On conversion to a disk-backed store, the disk file is created * using the temporary file mechansism and is marked for eventual deletion no * later than when the JVM exits and as soon as the store is {@link #close()}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo The {@link TemporaryRawStore} would benefit from any caching or AIO * solutions developed for the {@link DiskOnlyStrategy}. * * @todo The temporary store does NOT currently support MROW since it does not * lock out readers when converting from a {@link TransientBufferStrategy} * to a {@link DiskOnlyStrategy}. At this time, MROW is not required for * the {@link TemporaryRawStore} since it is used by a single-threaded * process such as a transaction and not by multiple threads (unlike the * Journal which does support MROW). */ public class TemporaryRawStore implements IRawStore { protected final static int DEFAULT_INITIAL_IN_MEMORY_EXTENT = Bytes.megabyte32 * 10; protected final static int DEFAULT_MAXIMUM_IN_MEMORY_EXTENT = Bytes.megabyte32 * 100; /** * The initial size of the in-memory buffer. This buffer will grow as * necessary until {@link #maximumInMemoryExtent} at which point the store * will convert over to a disk-based mechanism. */ public final long initialInMemoryExtent; /** * The maximum capacity of the in-memory buffer. */ public final long maximumInMemoryExtent; /** * Whether or not a direct buffer was used for the in-memory store (not * recommended). */ public final boolean useDirectBuffers; private boolean open = true; private IBufferStrategy buf; public IBufferStrategy getBufferStrategy() { return buf; } /** * Create a {@link TemporaryRawStore} with an initial in-memory capacity of 10M * that will grow up to 100M before converting into a disk-based store * backed by a temporary file. * * @todo the memory growth strategy does not respect the in-memory maximum * without parameterizing and overriding the #of bytes to extent the * store on overflow for {@link TransientBufferStrategy} */ public TemporaryRawStore() { this( DEFAULT_INITIAL_IN_MEMORY_EXTENT, DEFAULT_MAXIMUM_IN_MEMORY_EXTENT, false ); } /** * Create a {@link TemporaryRawStore} with the specified configuration. * * @param initialInMemoryExtent * The initial size of the in-memory buffer. This buffer will * grow as necessary until <i>maximumInMemoryExtent</i> at which * point the store will convert over to a disk-based mechanism. * @param maximumInMemoryExtent * The maximum capacity of the in-memory buffer. The actual * maximum may differ slightly based on the buffer growth policy. * @param useDirectBuffers * Whether or not the in-memory buffer will be direct. The use of * a direct buffer here is NOT recommended. */ public TemporaryRawStore(long initialInMemoryExtent, long maximumInMemoryExtent, boolean useDirectBuffers) { buf = new TransientBufferStrategy(initialInMemoryExtent, maximumInMemoryExtent, useDirectBuffers); this.initialInMemoryExtent = initialInMemoryExtent; this.maximumInMemoryExtent = maximumInMemoryExtent; this.useDirectBuffers = useDirectBuffers; } /** * Close the store and delete the associated file, if any. */ public void close() { if(!open) throw new IllegalStateException(); open = false; buf.close(); buf.deleteFile(); buf = null; } public void force(boolean metadata) { if(!open) throw new IllegalStateException(); buf.force(metadata); } public boolean isOpen() { return open; } /** * Always returns <code>false</code> since the store will be deleted as soon * as it is closed. */ public boolean isStable() { if(!open) throw new IllegalStateException(); return false; } public boolean isFullyBuffered() { if(!open) throw new IllegalStateException(); return buf.isFullyBuffered(); } public ByteBuffer read(long addr) { if(!open) throw new IllegalStateException(); return buf.read(addr); } public long write(ByteBuffer data) { if(!open) throw new IllegalStateException(); try { return buf.write(data); } catch(OverflowException ex) { if(buf instanceof TransientBufferStrategy) { overflowToDisk(); return buf.write(data); } else { throw ex; } } } protected void overflowToDisk() { System.err.println("TemporaryRawStore: overflow to disk; nbytes=" + buf.getNextOffset()); TransientBufferStrategy tmp = (TransientBufferStrategy)buf; int segmentId = 0; File file = null; // request a unique filename. /* * Set the initial extent to be large enough for the root blocks plus * twice the data in the in-memory buffer. */ long initialExtent = FileMetadata.headerSize0 + tmp.getUserExtent() * 2; final boolean create = true; final boolean readOnly = false; // Do not force writes since the store is not restart safe. final ForceEnum forceWrites = ForceEnum.No; /* Create a unique store file and setup the root blocks. The file * will be pre-extended to the requested initialExtent. */ FileMetadata fileMetadata = new FileMetadata(segmentId, file, BufferMode.Disk, useDirectBuffers, initialExtent, create, readOnly, forceWrites); // Mark the file for deletion on exit. fileMetadata.file.deleteOnExit(); // Open the disk-based store file. DiskOnlyStrategy diskBuf = new DiskOnlyStrategy(Bytes.gigabyte * 2, fileMetadata); try { /* * Transfer the data from the in-memory buffer to the disk. The * write begins immediately after the file header. */ // setup the transfer source. ByteBuffer b = tmp.directBuffer; b.limit(tmp.nextOffset); b.position(0); // write the data on the channel. diskBuf.channel.write(b,diskBuf.headerSize); // increment the offset. diskBuf.nextOffset += tmp.nextOffset; } catch(IOException ex) { try { diskBuf.close(); } catch (Throwable ex2) { } throw new RuntimeException(); } this.buf = diskBuf; } } Index: IJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IJournal.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** IJournal.java 17 Feb 2007 21:34:17 -0000 1.5 --- IJournal.java 21 Feb 2007 20:17:21 -0000 1.6 *************** *** 64,68 **** * @version $Id$ */ ! public interface IJournal extends IRawStore, IAtomicStore, IIndexManager { /** --- 64,69 ---- * @version $Id$ */ ! public interface IJournal extends IMROW, IRawStore, IAtomicStore, ! IIndexManager, ITransactionManager { /** Index: CommitRecordIndex.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/CommitRecordIndex.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** CommitRecordIndex.java 17 Feb 2007 21:34:17 -0000 1.1 --- CommitRecordIndex.java 21 Feb 2007 20:17:21 -0000 1.2 *************** *** 66,69 **** --- 66,72 ---- protected byte[] getKey(long commitTime) { + /* + * Note: The {@link KeyBuilder} is NOT thread-safe + */ return keyBuilder.reset().append(commitTime).getKey(); *************** *** 78,82 **** * that commit timestamp. */ ! public boolean hasTimestamp(long commitTime) { return super.contains(getKey(commitTime)); --- 81,85 ---- * that commit timestamp. */ ! synchronized public boolean hasTimestamp(long commitTime) { return super.contains(getKey(commitTime)); *************** *** 95,99 **** * is no {@link ICommitTimestamp} for that commit time. */ ! public ICommitRecord get(long commitTime) { ICommitRecord commitRecord; --- 98,102 ---- * is no {@link ICommitTimestamp} for that commit time. */ ! synchronized public ICommitRecord get(long commitTime) { ICommitRecord commitRecord; *************** *** 141,145 **** * @see #get(long) */ ! public ICommitRecord find(long timestamp) { final int index = findIndexOf(timestamp); --- 144,148 ---- * @see #get(long) */ ! synchronized public ICommitRecord find(long timestamp) { final int index = findIndexOf(timestamp); *************** *** 170,174 **** * defined. */ ! public int findIndexOf(long timestamp) { int pos = super.indexOf(getKey(timestamp)); --- 173,177 ---- * defined. */ ! synchronized public int findIndexOf(long timestamp) { int pos = super.indexOf(getKey(timestamp)); *************** *** 221,226 **** protected ICommitRecord loadCommitRecord(IRawStore store, long addr) { ! return CommitRecordSerializer.INSTANCE.deserialize(store.read(addr, ! null)); } --- 224,228 ---- protected ICommitRecord loadCommitRecord(IRawStore store, long addr) { ! return CommitRecordSerializer.INSTANCE.deserialize(store.read(addr)); } *************** *** 244,248 **** * if <i>addr</i> is invalid. */ ! public void add(long commitRecordAddr, ICommitRecord commitRecord) { if (commitRecord == null) --- 246,250 ---- * if <i>addr</i> is invalid. */ ! synchronized public void add(long commitRecordAddr, ICommitRecord commitRecord) { if (commitRecord == null) Index: Options.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Options.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** Options.java 15 Feb 2007 14:23:49 -0000 1.8 --- Options.java 21 Feb 2007 20:17:21 -0000 1.9 *************** *** 153,162 **** * stable storage on a commit (default <code>forceMetadata</code>). * <dl> ! * <dt>no</dt> * <dd>This option is useful when the journal is replicated so that we can * always failover to another server having the same data. Unless the file * is replicated or transient, this mode can lead to lost data if there is a * hardware or software failure.</dd> ! * <dt>force</dt> * <dd>Force the journal contents, but not the file metadata, to stable * storage. The precise semantics of this option are dependent on the OS and --- 153,162 ---- * stable storage on a commit (default <code>forceMetadata</code>). * <dl> ! * <dt>No</dt> * <dd>This option is useful when the journal is replicated so that we can * always failover to another server having the same data. Unless the file * is replicated or transient, this mode can lead to lost data if there is a * hardware or software failure.</dd> ! * <dt>Force</dt> * <dd>Force the journal contents, but not the file metadata, to stable * storage. The precise semantics of this option are dependent on the OS and *************** *** 169,173 **** * always a serious loss, but is normally repairable with a file system * repair utility). </dd> ! * <dt>forceMetadata</dt> * <dd>Force the journal contents and the file metadata to stable storage. * This option is the most secure. </dd> --- 169,173 ---- * always a serious loss, but is normally repairable with a file system * repair utility). </dd> ! * <dt>ForceMetadata</dt> * <dd>Force the journal contents and the file metadata to stable storage. * This option is the most secure. </dd> *************** *** 246,250 **** * metadata are forced). */ ! public final static ForceEnum DEFAULT_FORCE_ON_COMMIT = ForceEnum.ForceMetadata; /** --- 246,250 ---- * metadata are forced). */ ! public final static ForceEnum DEFAULT_FORCE_ON_COMMIT = ForceEnum.Force; /** Index: FileMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/FileMetadata.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** FileMetadata.java 17 Feb 2007 21:34:18 -0000 1.12 --- FileMetadata.java 21 Feb 2007 20:17:21 -0000 1.13 *************** *** 408,413 **** switch (bufferMode) { case Direct: { ! // Allocate a direct buffer. ! buffer = ByteBuffer.allocateDirect((int) userExtent); // Setup to read data from file into the buffer. buffer.limit(nextOffset); --- 408,415 ---- switch (bufferMode) { case Direct: { ! // Allocate the buffer buffer. ! buffer = (useDirectBuffers ? ByteBuffer ! .allocateDirect((int) userExtent) : ByteBuffer ! .allocate((int) userExtent)); // Setup to read data from file into the buffer. buffer.limit(nextOffset); *************** *** 531,535 **** case Direct: /* ! * Allocate a direct buffer. * * Note that we do not read in any data since no user data has --- 533,537 ---- case Direct: /* ! * Allocate the buffer. * * Note that we do not read in any data since no user data has *************** *** 537,545 **** * to avoid possible overwrites. */ - // buffer = ByteBuffer.allocateDirect((int) userExtent); buffer = (useDirectBuffers ? ByteBuffer ! .allocateDirect((int)userExtent) ! : ByteBuffer ! .allocate((int)userExtent)); break; case Mapped: --- 539,545 ---- * to avoid possible overwrites. */ buffer = (useDirectBuffers ? ByteBuffer ! .allocateDirect((int) userExtent) : ByteBuffer ! .allocate((int) userExtent)); break; case Mapped: Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.31 retrieving revision 1.32 diff -C2 -d -r1.31 -r1.32 *** Tx.java 19 Feb 2007 19:00:20 -0000 1.31 --- Tx.java 21 Feb 2007 20:17:21 -0000 1.32 *************** *** 57,60 **** --- 57,61 ---- import com.bigdata.objndx.IndexSegment; import com.bigdata.objndx.ReadOnlyIndex; + import com.bigdata.rawstore.Bytes; import com.bigdata.scaleup.MetadataIndex; import com.bigdata.scaleup.PartitionedIndex; *************** *** 72,79 **** * </p> * <p> ! * The write set of a transaction is written onto a {@link TemporaryStore}. * Therefore the size limit on the transaction write set is currently 2G, but ! * the transaction will run in memory up to 100M. The {@link TemporaryStore} is ! * closed and any backing file is deleted as soon as the transaction completes. * </p> * <p> --- 73,81 ---- * </p> * <p> ! * The write set of a transaction is written onto a {@link TemporaryRawStore}. * Therefore the size limit on the transaction write set is currently 2G, but ! * the transaction will run in memory up to 100M. The {@link TemporaryRawStore} ! * is closed and any backing file is deleted as soon as the transaction ! * completes. * </p> * <p> *************** *** 90,93 **** --- 92,101 ---- * @version $Id$ * + * @todo track whether or not the transaction has written any isolated data. do + * this at the same time that I modify the isolated indices use a + * delegation strategy so that I can trap attempts to access an isolated + * index once the transaction is no longer active. define "active" as + * up to the point where a "commit" or "abort" is _requested_ for the tx. + * * @todo Support read-committed transactions (data committed by _other_ * transactions during the transaction will be visible within that *************** *** 123,127 **** * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by startTimestamp into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). --- 131,135 ---- * (so we don't throw it away until no transactions can reach back that * far) as well as an index into the named indices index -- perhaps simply ! * an index by startTime into the root addresses (or whole root block * views, or moving the root addresses out of the root block and into the * store with only the address of the root addresses in the root block). *************** *** 157,171 **** /** ! * The start startTimestamp assigned to this transaction. * <p> ! * Note: Transaction {@link #startTimestamp} and {@link #commitTimestamp}s * are assigned by a global time service. The time service must provide * unique times for transaction start and commit timestamps and for commit * times for unisolated {@link Journal#commit()}s. */ ! final protected long startTimestamp; /** ! * The commit startTimestamp assigned to this transaction. */ private long commitTimestamp; --- 165,179 ---- /** ! * The start startTime assigned to this transaction. * <p> ! * Note: Transaction {@link #startTime} and {@link #commitTimestamp}s * are assigned by a global time service. The time service must provide * unique times for transaction start and commit timestamps and for commit * times for unisolated {@link Journal#commit()}s. */ ! final protected long startTime; /** ! * The commit startTime assigned to this transaction. */ private long commitTimestamp; *************** *** 177,186 **** /** - * The commit counter on the journal as of the time that this transaction - * object was created. - */ - final protected long commitCounter; - - /** * The historical {@link ICommitRecord} choosen as the ground state for this * transaction. All indices isolated by this transaction are isolated as of --- 185,188 ---- *************** *** 192,207 **** /** ! * A store used to hold write sets for the transaction. The same store can ! * be used to buffer resolved write-write conflicts. This uses a ! * {@link TemporaryStore} to avoid issues with maintaining transactions ! * across journal boundaries. This places a limit on transactions of 2G in ! * their serialized write set. Since the indices use a copy-on-write model, ! * the amount of user data can be significantly less due to multiple ! * versions of the same btree nodes. Using smaller branching factors in the ! * isolated index helps significantly to increase the effective utilization ! * of the store since copy-on-write causes fewer bytes to be copied each ! * time it is invoked. */ ! final protected TemporaryStore tmpStore = new TemporaryStore(); /** --- 194,210 ---- /** ! * A store used to hold write sets for read-write transactions (it is null ! * iff the transaction is read-only). The same store can be used to buffer ! * resolved write-write conflicts. This uses a {@link TemporaryRawStore} to ! * avoid issues with maintaining transactions across journal boundaries. ! * This places a limit on transactions of 2G in their serialized write set. ! * <p> ! * Since the indices use a copy-on-write model, the amount of user data can ! * be significantly less due to multiple versions of the same btree nodes. ! * Using smaller branching factors in the isolated index helps significantly ! * to increase the effective utilization of the store since copy-on-write ! * causes fewer bytes to be copied each time it is invoked. */ ! final protected TemporaryRawStore tmpStore; /** *************** *** 215,219 **** public Tx(Journal journal,long timestamp) { ! this(journal,timestamp,false); } --- 218,222 ---- public Tx(Journal journal,long timestamp) { ! this(journal, timestamp, false); } *************** *** 221,234 **** /** * Create a transaction starting the last committed state of the journal as ! * of the specified startTimestamp. * * @param journal * The journal. * ! * @param startTimestamp ! * The startTimestamp, which MUST be assigned consistently based on a * {@link ITimestampService}. Note that a transaction does not * start up on all {@link Journal}s at the same time. Instead, ! * the transaction start startTimestamp is assigned by a centralized * time service and then provided each time a transaction object * must be created for isolated on some {@link Journal}. --- 224,237 ---- /** * Create a transaction starting the last committed state of the journal as ! * of the specified startTime. * * @param journal * The journal. * ! * @param startTime ! * The startTime, which MUST be assigned consistently based on a * {@link ITimestampService}. Note that a transaction does not * start up on all {@link Journal}s at the same time. Instead, ! * the transaction start startTime is assigned by a centralized * time service and then provided each time a transaction object * must be created for isolated on some {@link Journal}. *************** *** 248,265 **** this.journal = journal; ! this.startTimestamp = timestamp; this.readOnly = readOnly; journal.activateTx(this); /* - * Stash the commit counter so that we can figure out if there were - * concurrent transactions and therefore whether or not we need to - * validate the write set. - */ - this.commitCounter = journal.getRootBlockView().getCommitCounter(); - - /* * The commit record serving as the ground state for the indices * isolated by this transaction (MAY be null, in which case the --- 251,267 ---- this.journal = journal; ! this.startTime = timestamp; this.readOnly = readOnly; + + this.tmpStore = readOnly ? null : new TemporaryRawStore( + Bytes.megabyte * 1, // initial in-memory size. + Bytes.megabyte * 10, // maximum in-memory size. + false // do NOT use direct buffers. + ); journal.activateTx(this); /* * The commit record serving as the ground state for the indices * isolated by this transaction (MAY be null, in which case the *************** *** 279,283 **** final public int hashCode() { ! return Long.valueOf(startTimestamp).hashCode(); } --- 281,285 ---- final public int hashCode() { ! return Long.valueOf(startTime).hashCode(); } *************** *** 291,295 **** final public boolean equals(ITx o) { ! return this == o || startTimestamp == o.getStartTimestamp(); } --- 293,297 ---- final public boolean equals(ITx o) { ! return this == o || startTime == o.getStartTimestamp(); } *************** *** 301,310 **** * * @todo verify that this has the semantics of the transaction start time ! * and that the startTimestamp is (must be) assigned by the same service * that assigns the {@link #getCommitTimestamp()}. */ final public long getStartTimestamp() { ! return startTimestamp; } --- 303,312 ---- * * @todo verify that this has the semantics of the transaction start time ! * and that the startTime is (must be) assigned by the same service * that assigns the {@link #getCommitTimestamp()}. */ final public long getStartTimestamp() { ! return startTime; } *************** *** 343,347 **** public String toString() { ! return ""+startTimestamp; } --- 345,349 ---- public String toString() { ! return ""+startTime; } *************** *** 362,371 **** /* ! * Close and delete the TemporaryStore. */ ! if (tmpStore.isOpen()) { tmpStore.close(); ! } --- 364,373 ---- /* ! * Close and delete the TemporaryRawStore. */ ! if (tmpStore != null && tmpStore.isOpen()) { tmpStore.close(); ! } *************** *** 396,400 **** /* ! * The commit startTimestamp is assigned when we prepare the transaction * since the the commit protocol does not permit unisolated writes * once a transaction begins to prepar until the transaction has --- 398,402 ---- /* ! * The commit startTime is assigned when we prepare the transaction * since the the commit protocol does not permit unisolated writes * once a transaction begins to prepar until the transaction has *************** *** 415,419 **** */ ! if (!validate()) { abort(); --- 417,421 ---- */ ! if (!validateWriteSets()) { abort(); *************** *** 641,645 **** * Validate all isolated btrees written on by this transaction. */ ! private boolean validate() { assert ! readOnly; --- 643,647 ---- * Validate all isolated btrees written on by this transaction. */ ! private boolean validateWriteSets() { assert ! readOnly; *************** *** 647,656 **** /* * This compares the current commit counter on the journal with the ! * commit counter at the time that this transaction was started. If they ! * are the same, then no intervening commits have occurred on the ! * journal and there is nothing to validate. */ ! if(journal.getRootBlockView().getCommitCounter() == commitCounter) { return true; --- 649,660 ---- /* * This compares the current commit counter on the journal with the ! * commit counter as of the start time for the transaction. If they are ! * the same, then no intervening commits have occurred on the journal ! * and there is nothing to validate. */ ! if (commitRecord == null ! || (journal.getRootBlockView().getCommitCounter() == commitRecord ! .getCommitCounter())) { return true; Index: TemporaryStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TemporaryStore.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** TemporaryStore.java 19 Feb 2007 19:00:23 -0000 1.5 --- TemporaryStore.java 21 Feb 2007 20:17:21 -0000 1.6 *************** *** 43,286 **** */ /* ! * Created on Feb 15, 2007 */ package com.bigdata.journal; - import java.io.File; - import java.io.IOException; - import java.nio.ByteBuffer; - import com.bigdata.objndx.IIndex; import com.bigdata.rawstore.Addr; - import com.bigdata.rawstore.Bytes; - import com.bigdata.rawstore.IRawStore; /** ! * A non-restart-safe store for temporary data that buffers data in memory until ! * a maximum capacity has been reached and then converts to a disk-based store ! * with a maximum capacity of 2G. The maximum capacity constraint is imposed by ! * {@link Addr}. On conversion to a disk-backed store, the disk file is created ! * using the temporary file mechansism and is marked for eventual deletion no ! * later than when the JVM exits and as soon as the store is {@link #close()}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo The {@link TemporaryStore} would benefit from any caching or AIO - * solutions developed for the {@link DiskOnlyStrategy}. */ ! public class TemporaryStore implements IRawStore, IStore { ! ! /** ! * The initial size of the in-memory buffer. This buffer will grow as ! * necessary until {@link #maximumInMemoryExtent} at which point the store ! * will convert over to a disk-based mechanism. ! */ ! public final long initialInMemoryExtent; ! ! /** ! * The maximum capacity of the in-memory buffer. ! */ ! public final long maximumInMemoryExtent; ! ! /** ! * Whether or not a direct buffer was used for the in-memory store (not ! * recommended). ! */ ! public final boolean useDirectBuffers; ! ! private boolean open = true; ! private IBufferStrategy buf; ! ! public IBufferStrategy getBufferStrategy() { ! ! return buf; ! ! } /** ! * Create a {@link TemporaryStore} with an initial in-memory capacity of 10M ! * that will grow up to 100M before converting into a disk-based store ! * backed by a temporary file. */ public TemporaryStore() { ! this( Bytes.megabyte*10, Bytes.megabyte*100, false ); } /** - * Create a {@link TemporaryStore} with the specified configuration. - * * @param initialInMemoryExtent - * The initial size of the in-memory buffer. This buffer will - * grow as necessary until <i>maximumInMemoryExtent</i> at which - * point the store will convert over to a disk-based mechanism. * @param maximumInMemoryExtent - * The maximum capacity of the in-memory buffer. The actual - * maximum may differ slightly based on the buffer growth policy. * @param useDirectBuffers - * Whether or not the in-memory buffer will be direct. The use of - * a direct buffer here is NOT recommended. */ public TemporaryStore(long initialInMemoryExtent, long maximumInMemoryExtent, boolean useDirectBuffers) { - - buf = new TransientBufferStrategy(initialInMemoryExtent, maximumInMemoryExtent, - useDirectBuffers); - - this.initialInMemoryExtent = initialInMemoryExtent; - - this.maximumInMemoryExtent = maximumInMemoryExtent; - - this.useDirectBuffers = useDirectBuffers; - - setupName2AddrBTree(); - - } ! /** ! * Close the store and delete the associated file, if any. ! */ ! public void close() { ! ! if(!open) throw new IllegalStateException(); ! ! open = false; ! ! buf.close(); ! ! buf.deleteFile(); ! ! } ! ! public void force(boolean metadata) { ! ! if(!open) throw new IllegalStateException(); ! ! buf.force(metadata); ! ! } ! ! public boolean isOpen() { ! ! return open; ! ! } ! ! /** ! * Always returns <code>false</code> since the store will be deleted as soon ! * as it is closed. ! */ ! public boolean isStable() { ! return false; ! } ! ! public ByteBuffer read(long addr, ByteBuffer dst) { ! ! if(!open) throw new IllegalStateException(); ! ! return buf.read(addr, dst); ! ! } ! ! public long write(ByteBuffer data) { ! ! if(!open) throw new IllegalStateException(); ! ! try { ! ! return buf.write(data); ! ! } catch(OverflowException ex) { ! ! if(buf instanceof TransientBufferStrategy) { ! ! overflowToDisk(); ! ! return buf.write(data); ! ! } else { ! ! throw ex; ! ! } ! ! } ! ! } ! ! protected void overflowToDisk() { ! ! TransientBufferStrategy tmp = (TransientBufferStrategy)buf; ! ! int segmentId = 0; ! ! File file = null; // request a unique filename. ! ! /* ! * Set the initial extent to be large enough for the root blocks plus ! * twice the data in the in-memory buffer. ! */ ! long initialExtent = FileMetadata.headerSize0 + tmp.getUserExtent() * 2; ! ! final boolean create = true; ! ! final boolean readOnly = false; ! ! // Do not force writes since the store is not restart safe. ! final ForceEnum forceWrites = ForceEnum.No; ! ! /* Create a unique store file and setup the root blocks. The file ! * will be pre-extended to the requested initialExtent. ! */ ! FileMetadata fileMetadata = new FileMetadata(segmentId, file, ! BufferMode.Disk, useDirectBuffers, initialExtent, ! create, readOnly, forceWrites); ! ! // Mark the file for deletion on exit. ! fileMetadata.file.deleteOnExit(); ! ! // Open the disk-based store file. ! DiskOnlyStrategy diskBuf = new DiskOnlyStrategy(Bytes.gigabyte * 2, ! fileMetadata); ! ! try { ! /* ! * Transfer the data from the in-memory buffer to the disk. The ! * write begins immediately after the file header. ! */ ! ! // setup the transfer source. ! ByteBuffer b = tmp.directBuffer; ! b.limit(tmp.nextOffset); ! b.position(0); ! ! ... [truncated message content] |