From: Bryan T. <tho...@us...> - 2007-02-20 00:27:10
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv24314/src/java/com/bigdata/journal Modified Files: TransactionServer.java Journal.java BufferMode.java Added Files: ITransactionManager.java Log Message: Worked through basic serialization of commits. Index: BufferMode.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/BufferMode.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** BufferMode.java 16 Oct 2006 14:04:41 -0000 1.4 --- BufferMode.java 20 Feb 2007 00:27:03 -0000 1.5 *************** *** 28,32 **** * </p> */ ! Transient("transient"), /** --- 28,32 ---- * </p> */ ! Transient("Transient"), /** *************** *** 46,50 **** * </p> */ ! Direct("direct"), /** --- 46,50 ---- * </p> */ ! Direct("Direct"), /** *************** *** 65,69 **** * @see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 */ ! Mapped("mapped"), /** --- 65,69 ---- * @see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038 */ ! Mapped("Mapped"), /** *************** *** 77,81 **** * </p> */ ! Disk("disk"); private final String name; --- 77,81 ---- * </p> */ ! Disk("Disk"); private final String name; *************** *** 94,99 **** /** ! * Parse a string whose contents must be "transient", "direct", "mapped", or ! * "disk". * * @param s --- 94,99 ---- /** ! * Parse a string whose contents must be "Transient", "Direct", "Mapped", or ! * "Disk". * * @param s --- NEW FILE: ITransactionManager.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 19, 2007 */ package com.bigdata.journal; import com.bigdata.objndx.IIndex; /** * A client-facing interface for managing transaction life cycles. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface ITransactionManager { /** * Create a new fully-isolated read-write transaction. * * @return The transaction start time, which serves as the unique identifier * for the transaction. */ public long newTx(); /** * Create a new fully-isolated transaction. * * @param readOnly * When true, the transaction will reject writes. * * @return The transaction start time, which serves as the unique identifier * for the transaction. */ public long newTx(boolean readOnly); /** * Create a new read-committed transaction. The transaction will reject * writes. Any data committed by concurrent transactions will become visible * to indices isolated by this transaction (hence, "read comitted"). * <p> * This provides more isolation than "read dirty" since the concurrent * transactions MUST commit before their writes become visible to the a * read-committed transaction. * * @return The transaction start time, which serves as the unique identifier * for the transaction. */ public long newReadCommittedTx(); /** * Return the named index as isolated by the transaction. * * @param name * The index name. * @param ts * The transaction start time, which serves as the unique * identifier for the transaction. * * @return The isolated index. * * @exception IllegalArgumentException * if <i>name</i> is <code>null</code> * * @exception IllegalStateException * if there is no active transaction with that timestamp. */ public IIndex getIndex(String name, long ts); /** * Abort the transaction. * * @param ts * The transaction start time, which serves as the unique * identifier for the transaction. * * @exception IllegalStateException * if there is no active transaction with that timestamp. */ public void abort(long ts); /** * Commit the transaction. * * @param ts * The transaction start time, which serves as the unique * identifier for the transaction. * * @return The commit timestamp assigned to the transaction. * * @exception IllegalStateException * if there is no active transaction with that timestamp. */ public long commit(long ts); } Index: TransactionServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TransactionServer.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** TransactionServer.java 19 Feb 2007 19:00:20 -0000 1.3 --- TransactionServer.java 20 Feb 2007 00:27:03 -0000 1.4 *************** *** 249,276 **** /** - * A blocking queue that imposes serializability on transactions. A writable - * transaction that attempts to {@link ITx#prepare()} is placed onto this - * queue. When its turn comes, it will validate its write set. - * - * FIXME this really belongs in the {@link TransactionServer} rather than - * the {@link Journal}. The {@link TransactionServer} is responsible for - * serializing transactions and coordinating 2-phase commits. It should - * accomplish this by placing transactions that issue COMMIT requests onto a - * queue that imposes serial execution of the commit protocol. The - * transaction on the head of the queue will first prepare and then commit - * as soon as it is prepared. If the transaction fails validation, then it - * must be aborted, but it could be retried by the client. The application - * should only request a COMMIT. The {@link TransactionServer} is - * responsible for issuing PREPARE requests to all resources on which writes - * have been made during the transaction and then issuing COMMIT requests to - * those resources once they have all suceessfully prepared. - * - * @todo a concurrent hash map for the preparing/committing transactions - * could be kept locally on the journal in order to detect violations - * of serializability by the {@link TransactionServer}. - */ - final BlockingQueue<ITx> commitQueue = new LinkedBlockingQueue<ITx>(); - - /** * Map containing metadata for active transactions. */ --- 249,252 ---- Index: Journal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Journal.java,v retrieving revision 1.53 retrieving revision 1.54 diff -C2 -d -r1.53 -r1.54 *** Journal.java 19 Feb 2007 19:00:20 -0000 1.53 --- Journal.java 20 Feb 2007 00:27:03 -0000 1.54 *************** *** 53,56 **** --- 53,60 ---- import java.util.Map; import java.util.Properties; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; import org.apache.log4j.Level; *************** *** 159,183 **** * </ol> * - * FIXME The notion of a committed state needs to be captured by a persistent - * structure in the journal until (a) there are no longer any active - * transactions that can read from that committed state; and (b) the slots - * allocated to that committed state have been released on the journal. Those - * commit states need to be locatable on the journal, suggesting a record - * written by PREPARE and finalized by COMMIT. - * - * @todo Work out protocol for shutdown with the single-threaded journal server. - * - * @todo Normal transaction operations need to be interleaved with operations to - * migrate committed data to the read-optimized database; with operations - * to logically delete data versions (and their slots) on the journal once - * those version are no longer readable by any active transaction; and - * with operations to compact the journal (extending can occur during - * normal transaction operations). One approach is to implement - * thread-checking using thread local variables or the ability to assign - * the journal to a thread and then hand off the journal to the thread for - * the activity that needs to be run, returning control to the normal - * transaction thread on (or shortly after) interrupt or when the - * operation is finished. - * * @todo There is a dependency in a distributed database architecture on * transaction begin time. A very long running transaction could force the --- 163,166 ---- *************** *** 206,211 **** * the same object back when you ask for an isolated named index). * - * FIXME Write test suites for the {@link TransactionServer}. - * * @todo I need to revisit the assumptions for very large objects in the face of * the recent / planned redesign. I expect that using an index with a key --- 189,192 ---- *************** *** 214,218 **** * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal { /** --- 195,199 ---- * directed to a journal using a disk-only mode. */ ! public class Journal implements IJournal, ITransactionManager { /** *************** *** 253,257 **** * The service used to generate commit timestamps. * ! * @todo paramterize using {@link Options} so that we can resolve a * low-latency service for use with a distributed database commit * protocol. --- 234,238 ---- * The service used to generate commit timestamps. * ! * @todo parameterize using {@link Options} so that we can resolve a * low-latency service for use with a distributed database commit * protocol. *************** *** 318,334 **** /** - * A hash map containing all active transactions. A transaction that is - * preparing will be in this collection until it has either successfully - * prepared or aborted. - */ - final Map<Long, ITx> activeTx = new HashMap<Long, ITx>(); - - /** - * A hash map containing all transactions that have prepared but not yet - * either committed or aborted. - */ - final Map<Long, ITx> preparedTx = new HashMap<Long, ITx>(); - - /** * Create or open a journal. * --- 299,302 ---- *************** *** 710,728 **** /** ! * Shutdown the journal politely. ! * ! * @exception IllegalStateException ! * if there are active transactions. ! * @exception IllegalStateException ! * if there are prepared transactions. ! * ! * @todo Workout protocol for shutdown of the journal, including forced ! * shutdown when there are active or prepar(ed|ing) transactions, ! * timeouts on transactions during shutdown, notification of abort for ! * transactions that do not complete in a timely manner, and ! * survivability of prepared transactions across restart. Reconcile ! * the semantics of this method with those declared by the raw store ! * interface, probably by declaring a variant that accepts parameters ! * specifying how to handle the shutdown (immediate vs wait). */ public void shutdown() { --- 678,684 ---- /** ! * Shutdown the journal politely. Active transactions and transactions ! * pending commit will run to completion, but no new transactions will be ! * accepted. */ public void shutdown() { *************** *** 730,753 **** assertOpen(); ! final int nactive = activeTx.size(); ! ! if (nactive > 0) { ! ! throw new IllegalStateException("There are " + nactive ! + " active transactions"); ! ! } ! ! final int nprepare = preparedTx.size(); ! ! if (nprepare > 0) { ! ! throw new IllegalStateException("There are " + nprepare ! + " prepared transactions."); ! ! } // close immediately. close(); } --- 686,705 ---- assertOpen(); ! final long begin = System.currentTimeMillis(); ! ! log.warn("#active="+activeTx.size()+", shutting down..."); ! ! /* ! * allow all pending tasks to complete, but no new tasks will be ! * accepted. ! */ ! commitService.shutdown(); // close immediately. close(); + + final long elapsed = System.currentTimeMillis() - begin; + + log.warn("Journal is shutdown: elapsed="+elapsed); } *************** *** 760,763 **** --- 712,718 ---- assertOpen(); + // force the commit thread to quit immediately. + commitService.shutdownNow(); + _bufferStrategy.close(); *************** *** 1362,1373 **** /* ! * transaction support. */ /** ! * Create a new fully-isolated read-write transaction. * ! * @see #newTx(boolean), to which this method delegates its implementation. */ public long newTx() { --- 1317,1354 ---- /* ! * ITransactionManager and friends. ! * ! * @todo refactor into a service. provide an implementation that supports ! * only a single Journal resource and an implementation that supports a ! * scale up/out architecture. the journal should resolve the service using ! * JINI. the timestamp service should probably be co-located with the ! * transaction service. */ /** ! * A hash map containing all active transactions. A transaction that is ! * preparing will be in this collection until it has either successfully ! * prepared or aborted. ! */ ! final Map<Long, ITx> activeTx = new HashMap<Long, ITx>(); ! ! /** ! * A hash map containing all transactions that have prepared but not yet ! * either committed or aborted. * ! * @todo this is probably useless. A transaction will be in this map only ! * while it is actively committing. ! */ ! final Map<Long, ITx> preparedTx = new HashMap<Long, ITx>(); ! ! /** ! * A thread that imposes serializability on transactions. A writable ! * transaction that attempts to {@link #commit()} is added as a ! * {@link CommitTask} and queued for execution by this thread. When its turn ! * comes, it will validate its write set and commit iff validation succeeds. */ + final ExecutorService commitService = Executors + .newSingleThreadExecutor(Executors.defaultThreadFactory()); + public long newTx() { *************** *** 1376,1393 **** } - /** - * Create a new fully-isolated transaction. - * - * @param readOnly - * When true, the transaction will reject writes. - * - * @todo This method supports transactions in a non-distributed database in - * which there is a centralized {@link Journal} that handles all - * concurrency control. There needs to be a {@link TransactionServer} - * that starts transactions. The {@link JournalServer} should summon a - * transaction object them into being on a {@link Journal} iff - * operations isolated by that transaction are required on that - * {@link Journal}. - */ public long newTx(boolean readOnly) { --- 1357,1360 ---- *************** *** 1397,1410 **** } - /** - * Create a new read-committed transaction. - * - * @return A transaction that will reject writes. Any committed data will be - * visible to indices isolated by this transaction. - * - * @todo implement read-committed transaction support. - * - * @see #newTx(boolean) - */ public long newReadCommittedTx() { --- 1364,1367 ---- *************** *** 1413,1432 **** } - /** - * Return the named index as isolated by the transaction. - * - * @param name - * The index name. - * @param ts - * The start time of the transaction. - * - * @return The isolated index. - * - * @exception IllegalArgumentException - * if <i>name</i> is <code>null</code> - * - * @exception IllegalStateException - * if there is no active transaction with that timestamp. - */ public IIndex getIndex(String name, long ts) { --- 1370,1373 ---- *************** *** 1452,1462 **** } - /** - * Commit the transaction on the journal. - * - * @param ts The transaction start time. - * - * @return The commit timestamp assigned to the transaction. - */ public long commit(long ts) { --- 1393,1396 ---- *************** *** 1465,1472 **** if (tx == null) throw new IllegalArgumentException("No such tx: " + ts); ! tx.prepare(); ! return tx.commit(); } --- 1399,1471 ---- if (tx == null) throw new IllegalArgumentException("No such tx: " + ts); + + if(tx.isReadOnly()) { ! tx.prepare(); ! return tx.commit(); ! ! } ! ! try { ! ! long commitTime = commitService.submit(new CommitTask(tx)).get(); ! ! if(DEBUG) { ! ! log.debug("committed: startTime="+tx.getStartTimestamp()+", commitTime="+commitTime); ! ! } ! ! return commitTime; ! ! } catch(InterruptedException ex) { ! ! // interrupted, perhaps during shutdown. ! throw new RuntimeException(ex); ! ! } catch(ExecutionException ex) { ! ! Throwable cause = ex.getCause(); ! ! if(cause instanceof ValidationError) { ! ! throw (ValidationError) cause; ! ! } ! ! // this is an unexpected error. ! throw new RuntimeException(cause); ! ! } ! ! } ! ! /** ! * Task validates and commits a transaction when it is run by the ! * {@link Journal#commitService}. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! private static class CommitTask implements Callable<Long> { ! ! private final ITx tx; ! ! public CommitTask(ITx tx) { ! ! assert tx != null; ! ! this.tx = tx; ! ! } ! ! public Long call() throws Exception { ! ! tx.prepare(); ! ! return tx.commit(); ! ! } } *************** *** 1480,1483 **** --- 1479,1485 ---- * * @throws IllegalStateException + * + * @todo test for transactions that have already been completed? that would + * represent a protocol error in the transaction manager service. */ protected void activateTx(ITx tx) throws IllegalStateException { |