From: Thompson, B. B. <BRY...@sa...> - 2006-03-31 13:05:31
|
Kevin, I sent an earlier email that is hung up waiting on approval in which I agreed with you. However, I am now looking at the code [1] and I am = much less sure. This may be simply that I am unfamiliar with the use of ThreadLocal variables, but the point is to assign on the transaction = object the reference to the thread which is allowed to invoke methods on the transaction object API, right? That is exactly what I believe the code = is doing, and I believe that the field and method names reflect this. The inner Tx class begins around line# 247. Please take a look at this again. If this is not what we discussed, then I do not think that I understand how to apply the thread local variable properly. With reference to Queue.lock() [line# 377] and Queue.unlock() [line# = 422], there are comments in the code that the transaction object needs to be passed into each of these methods. I am working on an update to the = 2PL module that will support this. Thanks, -bryan [1] http://proto.cognitiveweb.org/projects/cweb/multiproject/cweb-concurrent= /xre f/org/CognitiveWeb/concurrent/locking/DefaultTransactionManager.html ________________________________________ From: jdb...@li... [mailto:jdb...@li...] On Behalf Of Kevin = Day=20 Sent: Thursday, March 30, 2006 8:02 PM To: JDBM Developer listserv Subject: [Jdbm-developer] re: DefaultTransactionManager - draft of prototype. Bryan- =A0 I've done a cursory glance through this code (this will be a heck of a = lot easier when we have svn access!)... =A0 The only thing that stands out that I feel needs comments is the currentThread variable...=A0 I think that you want this to be = threadLocalTx. =A0 So, the call to getCurrentThread and setCurrentThread=A0will actually = morph into getCurrentTx and setCurrentTx, etc..., as in: =A0 =A0 =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static=A0Tx getCurrentTx() = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return (Tx) threadLocalTx.get(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0 =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void setCurrentTx(Tx = contextTx) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (threadLocalTx.get() !=3D null) = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException("Tx already assigned for this thread"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0threadLocalTx.set(contextTx); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Throw exception if the=A0specified=A0lock = context=A0is not bound to the current thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = assertCurrentTx() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (threadLocalLockContext.get() = !=3D this) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException("Tx context not bound to this thread"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0} =A0 =A0 I didn't see an implementation for the Queue class itself, but I = suspect that the Queue.lock() method will wind up accepting a lock context = object as a parameter.=A0 In jdbm, this lock context will be the transaction = object returned by getCurrentTx(). =A0 Or am I maybe missing something important about how the Queue class operates?=A0 It seems to me that the items on the queue must somehow be associated with the transaction that holds the lock (or is waiting for = the lock)... =A0 - K =A0 =A0 =A0=20 > All, Here is some code based on our discussions that attempts to integrate the 2PL support with a transaction factory and support the binding of threads to transactions. =A0Please take a look. =A0I have made some = notes where the code is incomplete, requires features not available, etc. I am not sure where this is going right now. =A0I could see how this could be re-factored into an extensible mechanism for creating transactions and making use of the 2PL facilities, but I am not sure that this much of the control structure belongs in a generic purpose 2PL locking package rather than in some jdbm specific transaction management classes. -bryan package org.CognitiveWeb.concurrent.locking; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * A factory for transaction objects supporting assignment of threads to * transactions. A thread may assign a transaction to itself iff the transaction * is not currently bound to a thread. If a lock request would result in = a * deadlock, then a {@link DeadlockException} is thrown. If a lock = request * blocks, then a transaction will remain bound to that thread until the lock * request is granted, a timeout occurs, or the transaction is aborted. = When a * thread is running, it may release the bound transaction. A = transaction that * is not associated with any thread is not running. If other = transactions are * waiting on a transaction that is not running, then those transactions will * block until the transaction is rebound to another thread and it = releases its * locks. When a transaction completes (either aborts or commits), all = locks * associated with that transaction are released. *=20 * @author <a = Bryan">href=3D"mailto:tho...@us...">Bryan Thompson</a> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 = 14:41:46 thompsonbry Exp $ */ public class DefaultTransactionManager { =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The maximum multi-programming level. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0private final int capacity; =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The directed graph of the WAITS_FOR relation which is = used to detect =A0=A0=A0=A0=A0* deadlocks. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0protected final TxDag waitsFor; =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The active transaction objects. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0private final Set transactions; =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The queue for each in-use resource. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* FIXME This must be a weak value hash map so that = queues may be garbage =A0=A0=A0=A0=A0* collected once they are no longer in use. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0private final Map queues =3D new HashMap(); =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The maximum multi-programming level (from the = constructor). =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0public int capacity() { =A0=A0=A0=A0=A0=A0=A0=A0return capacity; =A0=A0=A0=A0} =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The current multi-programming level. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0public int size() { =A0=A0=A0=A0=A0=A0=A0=A0return transactions.size(); =A0=A0=A0=A0} =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Create a transaction manager. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param capacity =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The maximum = multiprogramming level (aka the maximum #of =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0concurrent = transactions). =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0public DefaultTransactionManager( int capacity ) =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0this.capacity =3D capacity; =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0waitsFor =3D new TxDag( capacity ); =A0=A0=A0=A0=A0=A0=A0=A0transactions =3D new HashSet( capacity ); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Create a new transaction. By default the transaction = will be bound to the =A0=A0=A0=A0=A0* thread in which this request was made. If the maximum multi-programming =A0=A0=A0=A0=A0* level would be exceeded, then this request will block = until the =A0=A0=A0=A0=A0* concurrency level has decreased. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0If the = thread is already bound to another transaction. =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0* @return The transaction. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0public Tx createTx() =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0return createTx( 0L, 0 ); =A0=A0=A0=A0} =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Create a new transaction. By default the transaction = will be bound to the =A0=A0=A0=A0=A0* thread in which this request was made. If the maximum multi-programming =A0=A0=A0=A0=A0* level would be exceeded, then this request will block = until the =A0=A0=A0=A0=A0* concurrency level has decreased. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param millis =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param nanos =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0If the = thread is already bound to another transaction. =A0=A0=A0=A0=A0* @exception TimeoutException =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0If the = transaction could not be created in the specified =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0time. =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0* @return The transaction. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0synchronized public Tx createTx( long millis, int nanos ) =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0// @todo enforce multi-programming limit. = =A0e.g., using java.util.concurrent.Queue. =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0Tx tx =3D new Tx( this ); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0transactions.add( tx ); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0return tx; =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Return the {@link Queue} for the identified resource. =A0=A0=A0=A0=A0* <p> =A0=A0=A0=A0=A0* Note: The {@link Queue}s are maintained in a weak = value hash map whose =A0=A0=A0=A0=A0* keys are the resource identified. Each transaction = must hold a hard =A0=A0=A0=A0=A0* reference to each queue for which it requests a lock = and must clear that =A0=A0=A0=A0=A0* hard reference once it releases its lock. Once there = are no more =A0=A0=A0=A0=A0* references to a given {@link Queue}, the garbage = collector will reclaim =A0=A0=A0=A0=A0* the queue. This is important since the number of queue = objects is bounded =A0=A0=A0=A0=A0* by the number of distinct resources in the database = and could otherwise =A0=A0=A0=A0=A0* easily exceed the available memory. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param resource =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The resource = identified. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param insert =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0When true a new = {@link Queue} will be created iff none exists =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0for that resource. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @return The queue for that resource or = <code>null</code> iff =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0<code> insert =3D=3D false = </code> and there is no queue for that =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0resource. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0synchronized protected Queue getQueue( Object resource, = boolean insert ) =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0if( resource =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0Queue queue =3D (Queue) queues.get(resource); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0if( queue =3D=3D null && insert ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue =3D new = Queue(waitsFor,resource); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queues.put( resource, queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0return queue; =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* A transaction object. A transaction object may only be = used by the thread =A0=A0=A0=A0=A0* to which it is currently bound. If the transaction is = not bound to any =A0=A0=A0=A0=A0* thread, then it must be bound to a thread before any = other operations may =A0=A0=A0=A0=A0* be taken. A transaction which is not bound to a thread = is not running and =A0=A0=A0=A0=A0* any transactions waiting on that transaction can not = run the transaction =A0=A0=A0=A0=A0* has been bound to a thread and its locks have been = released. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @author <a Bryan">href=3D"mailto:tho...@us...">Bryan Thompson</a> =A0=A0=A0=A0=A0* @version $Id: DefaultTransactionManager.java,v 1.1 = 2006/03/29 14:41:46 thompsonbry Exp $ =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @todo Define a transaction identifier, expose it, and = use it as the key =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0in {@link = DefaultTransactionManager#transactions}? =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0public static class Tx { =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* The transaction manager. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0private final DefaultTransactionManager txMgr; =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* The {@link Queue} for each lock requested = by this transaction. The =A0=A0=A0=A0=A0=A0=A0=A0=A0* queue is added to this collection before = the lock request is made and =A0=A0=A0=A0=A0=A0=A0=A0=A0* is cleared from the collection once the = lock has either been released =A0=A0=A0=A0=A0=A0=A0=A0=A0* or when a deadlock or timeout exception = was thrown at the time that =A0=A0=A0=A0=A0=A0=A0=A0=A0* the lock was requested. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0private final Set locks =3D new HashSet(); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* The transaction manager. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0public DefaultTransactionManager = getTranasctionManager() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return txMgr; =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Create a new transaction object. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0protected Tx( DefaultTransactionManager txMgr ) = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( txMgr =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0this.txMgr =3D txMgr; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0setCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* A thread local variable whose value is = either the thread to which the =A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction is currently bound or = <code>null</code> iff the =A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction is not currently bound to any = thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo Review synchronization requirements = for methods that access and =A0=A0=A0=A0=A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0set this thread local = variable. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0private static ThreadLocal currentThread =3D = new ThreadLocal() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0protected synchronized Object = initialValue() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return null; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0}; =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Return the thread to which this = transaction is currently bound or =A0=A0=A0=A0=A0=A0=A0=A0=A0* <code>null</code> iff the transaction is = not bound to any =A0=A0=A0=A0=A0=A0=A0=A0=A0* thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static Thread = getCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return (Thread) = currentThread.get(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Release the transaction from the thread of = the current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the transaction is not = bound to the thread of the =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = releaseCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0currentThread.set(null); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Bind the transaction to the thread of the = current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the transaction is = already bound to a thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = setCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (currentThread.get() !=3D null) = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException("thread already assigned"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0currentThread.set(Thread.currentThre= ad()); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Throw exception if the {@link Tx} is not = bound to the thread of =A0=A0=A0=A0=A0=A0=A0=A0=A0* the current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = assertCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (currentThread.get() !=3D = Thread.currentThread()) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException( =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= "transaction not bound to this thread"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Request a lock on the identified resource = in the specified mode. This =A0=A0=A0=A0=A0=A0=A0=A0=A0* method will block until the lock can be = granted. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @param resource =A0=A0=A0=A0=A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The = resource identifier. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @param mode =A0=A0=A0=A0=A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The mode = - see {@link LockMode). =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception DeadlockException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the lock request would = cause a deadlock. =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo add timeout variant of this method. =A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo add non-blocking variant of this = method. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0public void lock( final Object resource, final = short mode ) =A0=A0=A0=A0=A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( resource =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0final Queue queue =3D = txMgr.getQueue( resource, true ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0/* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* Note: If locks are reentrant, = then we need to be careful to only =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* clear the Queue reference from = [locks] once the transaction has =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* reduced the lock counter to = zero for that queue. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*/=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0final boolean exists =3D locks.add( = queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0try { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue.lock( mode ); // = @todo lock( this, mode ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0catch( DeadlockException t ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! exists ) = locks.remove( queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw t; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0catch( TimeoutException t ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! exists ) = locks.remove( queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw t; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0catch( Throwable t ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! exists ) = locks.remove( queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = RuntimeException( t ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Release the lock on the identified = resource. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @param resource =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the transaction does = not have a lock on that =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0resource. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0public void unlock( Object resource ) =A0=A0=A0=A0=A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( resource =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0Queue queue =3D txMgr.getQueue( = resource, true ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( queue =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException( =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= "no queue for resource: resource=3D" + resource); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue.unlock(); // @todo unlock( = this ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! locks.remove( queue ) ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = AssertionError(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* This method MUST be invoked to signal that = the transaction has =A0=A0=A0=A0=A0=A0=A0=A0=A0* completed its processing and is = responsible for releasing all locks =A0=A0=A0=A0=A0=A0=A0=A0=A0* and other resources associated with the = transaction. This method MUST =A0=A0=A0=A0=A0=A0=A0=A0=A0* be used whether the transaction was = successful or aborted. =A0This method =A0=A0=A0=A0=A0=A0=A0=A0=A0* also releases the bond between the = transaction and the thread which is =A0=A0=A0=A0=A0=A0=A0=A0=A0* executing that transaction. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0public void complete() =A0=A0=A0=A0=A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0/* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo Is it always possible to = call this method, e.g., on a tx =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* abort, from the thread = associated with the tx? Perhaps we need to =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* allow this method to be called = by any thread and always have it =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* release the tx - thread bond. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0/* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo In order to optimize = lock release behavior both this method =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* and Queue should probably = synchronize on waitsFor. This would =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* allow us to use a unlock() = variant that did not update the =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* waitsFor graph to release each = of the locks and then use the =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* optimized methods to clear the = row and column for this =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction from the waitsFor = graph. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* The code below just steps = through all of the locks that are still =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* held by this transaction = releasing them one at a time and =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* updating the waitsFor graph = each time a lock is released. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo If we have re-entrant = locks then this code will not work =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* since the application will = have to release the locks the right =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* #of times. I really do not see = the point of re-entrant locks, =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* certainly not if they are = associated with a lock count as opposed =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* to simply returning immedately = if the lock is already held by the =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0Iterator itr =3D locks.iterator(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0while( itr.hasNext() ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0Queue queue =3D (Queue) = itr.next(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue.unlock(); // = @todo unlock( this ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0itr.remove(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0releaseCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} // class Tx. =A0=A0=A0=A0 } < |
From: Thompson, B. B. <BRY...@sa...> - 2006-03-31 11:58:41
|
Kevin, Thanks for looking at the code. I can certainly make the naming changes that you suggest. Queue.lock() does accept a lock context object - it was in the comments where it was invoked - and so does Queue.unlock(). I just had not propagated that change through the code and documentation yet. -bryan _____ From: jdb...@li... [mailto:jdb...@li...] On Behalf Of Kevin Day Sent: Thursday, March 30, 2006 8:02 PM To: JDBM Developer listserv Subject: [Jdbm-developer] re: DefaultTransactionManager - draft of prototype. Bryan- I've done a cursory glance through this code (this will be a heck of a lot easier when we have svn access!)... The only thing that stands out that I feel needs comments is the currentThread variable... I think that you want this to be threadLocalTx. So, the call to getCurrentThread and setCurrentThread will actually morph into getCurrentTx and setCurrentTx, etc..., as in: synchronized public static Tx getCurrentTx() { return (Tx) threadLocalTx.get(); } synchronized public static void setCurrentTx(Tx contextTx) { if (threadLocalTx.get() != null) { throw new IllegalStateException("Tx already assigned for this thread"); } threadLocalTx.set(contextTx); } /** * Throw exception if the specified lock context is not bound to the current thread. */ synchronized public static void assertCurrentTx() { if (threadLocalLockContext.get() != this) { throw new IllegalStateException("Tx context not bound to this thread"); } } I didn't see an implementation for the Queue class itself, but I suspect that the Queue.lock() method will wind up accepting a lock context object as a parameter. In jdbm, this lock context will be the transaction object returned by getCurrentTx(). Or am I maybe missing something important about how the Queue class operates? It seems to me that the items on the queue must somehow be associated with the transaction that holds the lock (or is waiting for the lock)... - K > All, Here is some code based on our discussions that attempts to integrate the 2PL support with a transaction factory and support the binding of threads to transactions. Please take a look. I have made some notes where the code is incomplete, requires features not available, etc. I am not sure where this is going right now. I could see how this could be re-factored into an extensible mechanism for creating transactions and making use of the 2PL facilities, but I am not sure that this much of the control structure belongs in a generic purpose 2PL locking package rather than in some jdbm specific transaction management classes. -bryan package org.CognitiveWeb.concurrent.locking; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * A factory for transaction objects supporting assignment of threads to * transactions. A thread may assign a transaction to itself iff the transaction * is not currently bound to a thread. If a lock request would result in a * deadlock, then a {@link DeadlockException} is thrown. If a lock request * blocks, then a transaction will remain bound to that thread until the lock * request is granted, a timeout occurs, or the transaction is aborted. When a * thread is running, it may release the bound transaction. A transaction that * is not associated with any thread is not running. If other transactions are * waiting on a transaction that is not running, then those transactions will * block until the transaction is rebound to another thread and it releases its * locks. When a transaction completes (either aborts or commits), all locks * associated with that transaction are released. * * @author <a Bryan <mailto:href=> ">href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46 thompsonbry Exp $ */ public class DefaultTransactionManager { /** * The maximum multi-programming level. */ private final int capacity; /** * The directed graph of the WAITS_FOR relation which is used to detect * deadlocks. */ protected final TxDag waitsFor; /** * The active transaction objects. */ private final Set transactions; /** * The queue for each in-use resource. * * FIXME This must be a weak value hash map so that queues may be garbage * collected once they are no longer in use. */ private final Map queues = new HashMap(); /** * The maximum multi-programming level (from the constructor). */ public int capacity() { return capacity; } /** * The current multi-programming level. */ public int size() { return transactions.size(); } /** * Create a transaction manager. * * @param capacity * The maximum multiprogramming level (aka the maximum #of * concurrent transactions). */ public DefaultTransactionManager( int capacity ) { this.capacity = capacity; waitsFor = new TxDag( capacity ); transactions = new HashSet( capacity ); } /** * Create a new transaction. By default the transaction will be bound to the * thread in which this request was made. If the maximum multi-programming * level would be exceeded, then this request will block until the * concurrency level has decreased. * * @exception IllegalStateException * If the thread is already bound to another transaction. * * @return The transaction. */ public Tx createTx() { return createTx( 0L, 0 ); } /** * Create a new transaction. By default the transaction will be bound to the * thread in which this request was made. If the maximum multi-programming * level would be exceeded, then this request will block until the * concurrency level has decreased. * * @param millis * * @param nanos * * @exception IllegalStateException * If the thread is already bound to another transaction. * @exception TimeoutException * If the transaction could not be created in the specified * time. * * @return The transaction. */ synchronized public Tx createTx( long millis, int nanos ) { // @todo enforce multi-programming limit. e.g., using java.util.concurrent.Queue. Tx tx = new Tx( this ); transactions.add( tx ); return tx; } /** * Return the {@link Queue} for the identified resource. * <p> * Note: The {@link Queue}s are maintained in a weak value hash map whose * keys are the resource identified. Each transaction must hold a hard * reference to each queue for which it requests a lock and must clear that * hard reference once it releases its lock. Once there are no more * references to a given {@link Queue}, the garbage collector will reclaim * the queue. This is important since the number of queue objects is bounded * by the number of distinct resources in the database and could otherwise * easily exceed the available memory. * * @param resource * The resource identified. * * @param insert * When true a new {@link Queue} will be created iff none exists * for that resource. * * @return The queue for that resource or <code>null</code> iff * <code> insert == false </code> and there is no queue for that * resource. */ synchronized protected Queue getQueue( Object resource, boolean insert ) { if( resource == null ) { throw new IllegalArgumentException(); } Queue queue = (Queue) queues.get(resource); if( queue == null && insert ) { queue = new Queue(waitsFor,resource); queues.put( resource, queue ); } return queue; } /** * A transaction object. A transaction object may only be used by the thread * to which it is currently bound. If the transaction is not bound to any * thread, then it must be bound to a thread before any other operations may * be taken. A transaction which is not bound to a thread is not running and * any transactions waiting on that transaction can not run the transaction * has been bound to a thread and its locks have been released. * * @author <a Bryan <mailto:href=> ">href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46 thompsonbry Exp $ * * @todo Define a transaction identifier, expose it, and use it as the key * in {@link DefaultTransactionManager#transactions}? */ public static class Tx { /** * The transaction manager. */ private final DefaultTransactionManager txMgr; /** * The {@link Queue} for each lock requested by this transaction. The * queue is added to this collection before the lock request is made and * is cleared from the collection once the lock has either been released * or when a deadlock or timeout exception was thrown at the time that * the lock was requested. */ private final Set locks = new HashSet(); /** * The transaction manager. */ public DefaultTransactionManager getTranasctionManager() { return txMgr; } /** * Create a new transaction object. */ protected Tx( DefaultTransactionManager txMgr ) { if( txMgr == null ) { throw new IllegalArgumentException(); } this.txMgr = txMgr; setCurrentThread(); } /** * A thread local variable whose value is either the thread to which the * transaction is currently bound or <code>null</code> iff the * transaction is not currently bound to any thread. * * @todo Review synchronization requirements for methods that access and * set this thread local variable. */ private static ThreadLocal currentThread = new ThreadLocal() { protected synchronized Object initialValue() { return null; } }; /** * Return the thread to which this transaction is currently bound or * <code>null</code> iff the transaction is not bound to any * thread. */ synchronized public static Thread getCurrentThread() { return (Thread) currentThread.get(); } /** * Release the transaction from the thread of the current context. * * @exception IllegalStateException * if the transaction is not bound to the thread of the * current context. */ synchronized public static void releaseCurrentThread() { assertCurrentThread(); currentThread.set(null); } /** * Bind the transaction to the thread of the current context. * * @exception IllegalStateException * if the transaction is already bound to a thread. */ synchronized public static void setCurrentThread() { if (currentThread.get() != null) { throw new IllegalStateException("thread already assigned"); } currentThread.set(Thread.currentThread()); } /** * Throw exception if the {@link Tx} is not bound to the thread of * the current context. */ synchronized public static void assertCurrentThread() { if (currentThread.get() != Thread.currentThread()) { throw new IllegalStateException( "transaction not bound to this thread"); } } /** * Request a lock on the identified resource in the specified mode. This * method will block until the lock can be granted. * * @param resource * The resource identifier. * * @param mode * The mode - see {@link LockMode). * * @exception DeadlockException * if the lock request would cause a deadlock. * * @todo add timeout variant of this method. * @todo add non-blocking variant of this method. */ public void lock( final Object resource, final short mode ) { if( resource == null ) { throw new IllegalArgumentException(); } assertCurrentThread(); final Queue queue = txMgr.getQueue( resource, true ); /* * Note: If locks are reentrant, then we need to be careful to only * clear the Queue reference from [locks] once the transaction has * reduced the lock counter to zero for that queue. */ final boolean exists = locks.add( queue ); try { queue.lock( mode ); // @todo lock( this, mode ); } catch( DeadlockException t ) { if( ! exists ) locks.remove( queue ); throw t; } catch( TimeoutException t ) { if( ! exists ) locks.remove( queue ); throw t; } catch( Throwable t ) { if( ! exists ) locks.remove( queue ); throw new RuntimeException( t ); } } /** * Release the lock on the identified resource. * * @param resource * * @exception IllegalStateException * if the transaction does not have a lock on that * resource. */ public void unlock( Object resource ) { if( resource == null ) { throw new IllegalArgumentException(); } assertCurrentThread(); Queue queue = txMgr.getQueue( resource, true ); if( queue == null ) { throw new IllegalStateException( "no queue for resource: resource=" + resource); } queue.unlock(); // @todo unlock( this ); if( ! locks.remove( queue ) ) { throw new AssertionError(); } } /** * This method MUST be invoked to signal that the transaction has * completed its processing and is responsible for releasing all locks * and other resources associated with the transaction. This method MUST * be used whether the transaction was successful or aborted. This method * also releases the bond between the transaction and the thread which is * executing that transaction. */ public void complete() { /* * @todo Is it always possible to call this method, e.g., on a tx * abort, from the thread associated with the tx? Perhaps we need to * allow this method to be called by any thread and always have it * release the tx - thread bond. */ assertCurrentThread(); /* * @todo In order to optimize lock release behavior both this method * and Queue should probably synchronize on waitsFor. This would * allow us to use a unlock() variant that did not update the * waitsFor graph to release each of the locks and then use the * optimized methods to clear the row and column for this * transaction from the waitsFor graph. * * The code below just steps through all of the locks that are still * held by this transaction releasing them one at a time and * updating the waitsFor graph each time a lock is released. * * @todo If we have re-entrant locks then this code will not work * since the application will have to release the locks the right * #of times. I really do not see the point of re-entrant locks, * certainly not if they are associated with a lock count as opposed * to simply returning immedately if the lock is already held by the * transaction. */ Iterator itr = locks.iterator(); while( itr.hasNext() ) { Queue queue = (Queue) itr.next(); queue.unlock(); // @todo unlock( this ); itr.remove(); } releaseCurrentThread(); } } // class Tx. } < ------------------------------------------------------- This SF.Net email is sponsored by xPML, a groundbreaking scripting language that extends applications into web and mobile media. Attend the live webcast and join the prime developer group breaking into this new coding territory! http://sel.as-us.falkag.net/sel?cmd=lnk&kid=110944&bid=241720&dat=121642 _______________________________________________ Jdbm-developer mailing list Jdb...@li... https://lists.sourceforge.net/lists/listinfo/jdbm-developer |
From: Kevin D. <ke...@tr...> - 2006-03-31 15:08:23
|
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN"> <HTML><HEAD> <STYLE type=text/css> P, UL, OL, DL, DIR, MENU, PRE { margin: 0 auto;}</STYLE> <META content="MSHTML 6.00.2900.2802" name=GENERATOR></HEAD> <BODY leftMargin=1 topMargin=1 rightMargin=1><FONT face=Tahoma> <DIV><FONT face=Arial size=2>Bryan-</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>Ahh - ok - if you do indeed have a lock context, then you may want to consider changing all references to the Tx class in the changes I proposed to be generic Objects. The thread local variable will become threadLocalLockContect, and you will have calls for setting and retrieving the lock context. This nicely encapsulates the locking sub-system. Consumers of that sub-system (the jdbm transaction manager, for example) would choose their lock object types as they see fit (in the transaction manager, we would probaly just choose the transaction object - but that's optional).</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>Technically, we will need to store the transaction for the current thread in the transaction manager anyway, but I think that having a separation between the concept of a lock context and a transaction may be a good design decision.</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>As a quasi-side point: If you implement with generic lock contexts, then a user could obtain the exact behavior as your original design by using the deginerate case of assiging the current thread as the lock context object.</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>- K</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2> </FONT> <TABLE> <TBODY> <TR> <TD width=1 bgColor=blue><FONT face=Arial size=2></FONT></TD> <TD><FONT face=Arial size=2><FONT color=red>> <BR>Kevin, <BR> <BR>Thanks for looking at the code. I can certainly make the naming changes that you suggest. Queue.lock() does accept a lock context object it was in the comments where it was invoked and so does Queue.unlock(). I just had not propagated that change through the code and documentation yet. <BR> <BR>-bryan <BR> <BR><BR><BR>From: <A href="mailto:jdb...@li..."><FONT color=#0000ff>jdb...@li...</FONT></A> <A href="mailto:jdb...@li..."><FONT color=#0000ff>[mailto:jdb...@li...]</FONT></A> On Behalf Of Kevin Day <BR>Sent: Thursday, March 30, 2006 8:02 PM<BR>To: JDBM Developer listserv<BR>Subject: [Jdbm-developer] re: DefaultTransactionManager - draft of prototype. <BR> <BR><BR>Bryan- <BR><BR> <BR><BR>I've done a cursory glance through this code (this will be a heck of a lot easier when we have svn access!)... <BR><BR> <BR><BR>The only thing that stands out that I feel needs comments is the currentThread variable... I think that you want this to be threadLocalTx. <BR><BR> <BR><BR>So, the call to getCurrentThread and setCurrentThread will actually morph into getCurrentTx and setCurrentTx, etc..., as in: <BR><BR> <BR><BR> <BR><BR> synchronized public static Tx getCurrentTx() { <BR><BR> return (Tx) threadLocalTx.get();<BR> } <BR><BR> <BR><BR> synchronized public static void setCurrentTx(Tx contextTx) {<BR> if (threadLocalTx.get() != null) {<BR> throw new IllegalStateException("Tx already assigned for this thread");<BR> }<BR> threadLocalTx.set(contextTx);<BR> } <BR><BR> /**<BR> * Throw exception if the specified lock context is not bound to the current thread. <BR><BR> */<BR> synchronized public static void assertCurrentTx() {<BR> if (threadLocalLockContext.get() != this) {<BR> throw new IllegalStateException("Tx context not bound to this thread");<BR> }<BR> } <BR><BR> <BR><BR> <BR><BR>I didn't see an implementation for the Queue class itself, but I suspect that the Queue.lock() method will wind up accepting a lock context object as a parameter. In jdbm, this lock context will be the transaction object returned by getCurrentTx(). <BR><BR> <BR><BR>Or am I maybe missing something important about how the Queue class operates? It seems to me that the items on the queue must somehow be associated with the transaction that holds the lock (or is waiting for the lock)... <BR><BR> <BR><BR>- K <BR><BR> <BR><BR> <BR><BR> <BR><BR> <BR>> All,<BR><BR>Here is some code based on our discussions that attempts to integrate<BR>the 2PL support with a transaction factory and support the binding of<BR>threads to transactions. Please take a look. I have made some notes<BR>where the code is incomplete, requires features not available, etc.<BR><BR>I am not sure where this is going right now. I could see how this<BR>could be re-factored into an extensible mechanism for creating<BR>transactions and making use of the 2PL facilities, but I am not sure<BR>that this much of the control structure belongs in a generic purpose<BR>2PL locking package rather than in some jdbm specific transaction<BR>management classes.<BR><BR>-bryan<BR><BR>package org.CognitiveWeb.concurrent.locking;<BR><BR>import java.util.HashMap;<BR>import java.util.HashSet;<BR>import java.util.Iterator;<BR>import java.util.Map;<BR>import java.util.Set;<BR><BR>/**<BR>* A factory for transaction objects supporting assignment of threads to<BR>* transactions. A thread may assign a transaction to itself iff the<BR>transaction<BR>* is not currently bound to a thread. If a lock request would result in a<BR>* deadlock, then a {@link DeadlockException} is thrown. If a lock request<BR>* blocks, then a transaction will remain bound to that thread until the<BR>lock<BR>* request is granted, a timeout occurs, or the transaction is aborted. When<BR>a<BR>* thread is running, it may release the bound transaction. A transaction<BR>that<BR>* is not associated with any thread is not running. If other transactions<BR>are<BR>* waiting on a transaction that is not running, then those transactions<BR>will<BR>* block until the transaction is rebound to another thread and it releases<BR>its<BR>* locks. When a transaction completes (either aborts or commits), all locks<BR>* associated with that transaction are released.<BR>* <BR>* @author <a <A href="mailto:Bryan">href="mailto:tho...@us...">Bryan"><FONT color=#0000ff>Bryan">href="mailto:tho...@us...">Bryan</FONT></A><BR>Thompson</a><BR>* @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46<BR>thompsonbry Exp $<BR>*/<BR><BR>public class DefaultTransactionManager<BR>{<BR> <BR> /**<BR> * The maximum multi-programming level.<BR> */<BR> private final int capacity;<BR> <BR> /**<BR> * The directed graph of the WAITS_FOR relation which is used to<BR>detect<BR> * deadlocks.<BR> */<BR> protected final TxDag waitsFor;<BR><BR> /**<BR> * The active transaction objects.<BR> */<BR> private final Set transactions;<BR><BR> /**<BR> * The queue for each in-use resource.<BR> * <BR> * FIXME This must be a weak value hash map so that queues may be<BR>garbage<BR> * collected once they are no longer in use.<BR> */<BR> private final Map queues = new HashMap();<BR> <BR> /**<BR> * The maximum multi-programming level (from the constructor).<BR> */<BR> public int capacity() {<BR> return capacity;<BR> }<BR><BR> /**<BR> * The current multi-programming level.<BR> */<BR> public int size() {<BR> return transactions.size();<BR> }<BR> <BR> /**<BR> * Create a transaction manager.<BR> * <BR> * @param capacity<BR> * The maximum multiprogramming level (aka the maximum<BR>#of<BR> * concurrent transactions).<BR> */<BR> <BR> public DefaultTransactionManager( int capacity )<BR> {<BR> <BR> this.capacity = capacity;<BR> <BR> waitsFor = new TxDag( capacity );<BR><BR> transactions = new HashSet( capacity );<BR> <BR> }<BR><BR> /**<BR> * Create a new transaction. By default the transaction will be<BR>bound to the<BR> * thread in which this request was made. If the maximum<BR>multi-programming<BR> * level would be exceeded, then this request will block until the<BR> * concurrency level has decreased.<BR> * <BR> * @exception IllegalStateException<BR> * If the thread is already bound to another<BR>transaction.<BR> * <BR> * @return The transaction.<BR> */<BR> <BR> public Tx createTx()<BR> {<BR> <BR> return createTx( 0L, 0 );<BR><BR> }<BR> <BR> /**<BR> * Create a new transaction. By default the transaction will be<BR>bound to the<BR> * thread in which this request was made. If the maximum<BR>multi-programming<BR> * level would be exceeded, then this request will block until the<BR> * concurrency level has decreased.<BR> * <BR> * @param millis<BR> * <BR> * @param nanos<BR> * <BR> * @exception IllegalStateException<BR> * If the thread is already bound to another<BR>transaction.<BR> * @exception TimeoutException<BR> * If the transaction could not be created in the<BR>specified<BR> * time.<BR> * <BR> * @return The transaction.<BR> */<BR> <BR> synchronized public Tx createTx( long millis, int nanos )<BR> {<BR> <BR> // @todo enforce multi-programming limit. e.g., using<BR>java.util.concurrent.Queue.<BR> <BR> Tx tx = new Tx( this );<BR> <BR> transactions.add( tx );<BR> <BR> return tx;<BR> <BR> }<BR><BR> /**<BR> * Return the {@link Queue} for the identified resource.<BR> * <p><BR> * Note: The {@link Queue}s are maintained in a weak value hash map<BR>whose<BR> * keys are the resource identified. Each transaction must hold a<BR>hard<BR> * reference to each queue for which it requests a lock and must<BR>clear that<BR> * hard reference once it releases its lock. Once there are no more<BR> * references to a given {@link Queue}, the garbage collector will<BR>reclaim<BR> * the queue. This is important since the number of queue objects is<BR>bounded<BR> * by the number of distinct resources in the database and could<BR>otherwise<BR> * easily exceed the available memory.<BR> * <BR> * @param resource<BR> * The resource identified.<BR> * <BR> * @param insert<BR> * When true a new {@link Queue} will be created iff none<BR>exists<BR> * for that resource.<BR> * <BR> * @return The queue for that resource or <code>null</code> iff<BR> * <code> insert == false </code> and there is no queue for<BR>that<BR> * resource.<BR> */<BR> <BR> synchronized protected Queue getQueue( Object resource, boolean<BR>insert )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> <BR> Queue queue = (Queue) queues.get(resource);<BR> <BR> if( queue == null && insert ) {<BR> <BR> queue = new Queue(waitsFor,resource);<BR> <BR> queues.put( resource, queue );<BR> <BR> }<BR> <BR> return queue;<BR> <BR> }<BR> <BR> /**<BR> * A transaction object. A transaction object may only be used by<BR>the thread<BR> * to which it is currently bound. If the transaction is not bound<BR>to any<BR> * thread, then it must be bound to a thread before any other<BR>operations may<BR> * be taken. A transaction which is not bound to a thread is not<BR>running and<BR> * any transactions waiting on that transaction can not run the<BR>transaction<BR> * has been bound to a thread and its locks have been released.<BR> * <BR> * @author <a <A href="mailto:Bryan">href="mailto:tho...@us...">Bryan"><FONT color=#0000ff>Bryan">href="mailto:tho...@us...">Bryan</FONT></A><BR>Thompson</a><BR> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29<BR>14:41:46 thompsonbry Exp $<BR> * <BR> * @todo Define a transaction identifier, expose it, and use it as<BR>the key<BR> * in {@link DefaultTransactionManager#transactions}?<BR> */<BR><BR> public static class Tx {<BR><BR> /**<BR> * The transaction manager.<BR> */<BR> private final DefaultTransactionManager txMgr;<BR><BR> /**<BR> * The {@link Queue} for each lock requested by this<BR>transaction. The<BR> * queue is added to this collection before the lock request<BR>is made and<BR> * is cleared from the collection once the lock has either<BR>been released<BR> * or when a deadlock or timeout exception was thrown at the<BR>time that<BR> * the lock was requested.<BR> */<BR> private final Set locks = new HashSet();<BR> <BR> /**<BR> * The transaction manager.<BR> */<BR><BR> public DefaultTransactionManager getTranasctionManager() {<BR> return txMgr;<BR> }<BR><BR> /**<BR> * Create a new transaction object.<BR> */<BR> protected Tx( DefaultTransactionManager txMgr ) {<BR> if( txMgr == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> this.txMgr = txMgr;<BR> setCurrentThread();<BR> }<BR> <BR> /**<BR> * A thread local variable whose value is either the thread<BR>to which the<BR> * transaction is currently bound or <code>null</code> iff<BR>the<BR> * transaction is not currently bound to any thread.<BR> * <BR> * @todo Review synchronization requirements for methods<BR>that access and<BR> * set this thread local variable.<BR> */<BR> private static ThreadLocal currentThread = new ThreadLocal()<BR>{<BR> protected synchronized Object initialValue() {<BR> return null;<BR> }<BR> };<BR><BR> /**<BR> * Return the thread to which this transaction is currently<BR>bound or<BR> * <code>null</code> iff the transaction is not bound to any<BR> * thread.<BR> */<BR> synchronized public static Thread getCurrentThread() {<BR> return (Thread) currentThread.get();<BR> }<BR><BR> /**<BR> * Release the transaction from the thread of the current<BR>context.<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction is not bound to the<BR>thread of the<BR> * current context.<BR> */<BR> synchronized public static void releaseCurrentThread() {<BR> assertCurrentThread();<BR> currentThread.set(null);<BR> }<BR><BR> /**<BR> * Bind the transaction to the thread of the current<BR>context.<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction is already bound to a<BR>thread.<BR> */<BR> synchronized public static void setCurrentThread() {<BR> if (currentThread.get() != null) {<BR> throw new IllegalStateException("thread<BR>already assigned");<BR> }<BR> currentThread.set(Thread.currentThread());<BR> }<BR><BR> /**<BR> * Throw exception if the {@link Tx} is not bound to the<BR>thread of<BR> * the current context.<BR> */<BR> synchronized public static void assertCurrentThread() {<BR> if (currentThread.get() != Thread.currentThread()) {<BR> throw new IllegalStateException(<BR> "transaction not bound to<BR>this thread");<BR> }<BR> }<BR><BR> /**<BR> * Request a lock on the identified resource in the<BR>specified mode. This<BR> * method will block until the lock can be granted.<BR> * <BR> * @param resource<BR> * The resource identifier.<BR> * <BR> * @param mode<BR> * The mode - see {@link LockMode).<BR> * <BR> * @exception DeadlockException<BR> * if the lock request would cause a<BR>deadlock.<BR> * <BR> * @todo add timeout variant of this method.<BR> * @todo add non-blocking variant of this method.<BR> */<BR> <BR> public void lock( final Object resource, final short mode )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR><BR> assertCurrentThread();<BR><BR> final Queue queue = txMgr.getQueue( resource, true<BR>);<BR> <BR> /*<BR> * Note: If locks are reentrant, then we need to be<BR>careful to only<BR> * clear the Queue reference from [locks] once the<BR>transaction has<BR> * reduced the lock counter to zero for that queue.<BR> */ <BR> final boolean exists = locks.add( queue );<BR> <BR> try {<BR> queue.lock( mode ); // @todo lock( this,<BR>mode );<BR> }<BR> catch( DeadlockException t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw t;<BR> }<BR> catch( TimeoutException t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw t;<BR> }<BR> catch( Throwable t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw new RuntimeException( t );<BR> }<BR> <BR> }<BR><BR> /**<BR> * Release the lock on the identified resource.<BR> * <BR> * @param resource<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction does not have a lock on<BR>that<BR> * resource.<BR> */<BR> <BR> public void unlock( Object resource )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> <BR> assertCurrentThread();<BR> <BR> Queue queue = txMgr.getQueue( resource, true );<BR> <BR> if( queue == null ) {<BR> <BR> throw new IllegalStateException(<BR> "no queue for resource:<BR>resource=" + resource);<BR> <BR> }<BR><BR> queue.unlock(); // @todo unlock( this );<BR><BR><BR> if( ! locks.remove( queue ) ) {<BR> <BR> throw new AssertionError();<BR> <BR> }<BR> <BR> }<BR> <BR> /**<BR> * This method MUST be invoked to signal that the<BR>transaction has<BR> * completed its processing and is responsible for releasing<BR>all locks<BR> * and other resources associated with the transaction. This<BR>method MUST<BR> * be used whether the transaction was successful or<BR>aborted. This method<BR> * also releases the bond between the transaction and the<BR>thread which is<BR> * executing that transaction.<BR> */<BR> <BR> public void complete()<BR> {<BR><BR> /*<BR> * @todo Is it always possible to call this method,<BR>e.g., on a tx<BR> * abort, from the thread associated with the tx?<BR>Perhaps we need to<BR> * allow this method to be called by any thread and<BR>always have it<BR> * release the tx - thread bond.<BR> */<BR> assertCurrentThread();<BR> <BR> /*<BR> * @todo In order to optimize lock release behavior<BR>both this method<BR> * and Queue should probably synchronize on<BR>waitsFor. This would<BR> * allow us to use a unlock() variant that did not<BR>update the<BR> * waitsFor graph to release each of the locks and<BR>then use the<BR> * optimized methods to clear the row and column for<BR>this<BR> * transaction from the waitsFor graph.<BR> * <BR> * The code below just steps through all of the<BR>locks that are still<BR> * held by this transaction releasing them one at a<BR>time and<BR> * updating the waitsFor graph each time a lock is<BR>released.<BR> * <BR> * @todo If we have re-entrant locks then this code<BR>will not work<BR> * since the application will have to release the<BR>locks the right<BR> * #of times. I really do not see the point of<BR>re-entrant locks,<BR> * certainly not if they are associated with a lock<BR>count as opposed<BR> * to simply returning immedately if the lock is<BR>already held by the<BR> * transaction.<BR> */<BR> <BR> Iterator itr = locks.iterator();<BR> <BR> while( itr.hasNext() ) {<BR> <BR> Queue queue = (Queue) itr.next();<BR> <BR> queue.unlock(); // @todo unlock( this );<BR><BR><BR> itr.remove();<BR> <BR> }<BR><BR> releaseCurrentThread();<BR> <BR> }<BR> <BR> } // class Tx.<BR> <BR>}<BR><BR><<BR><<BR></FONT></FONT></TD></TR></TBODY></TABLE></DIV></FONT></BODY></HTML> |
From: Kevin D. <ke...@tr...> - 2006-03-31 21:07:11
|
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN"> <HTML><HEAD> <STYLE type=text/css> P, UL, OL, DL, DIR, MENU, PRE { margin: 0 auto;}</STYLE> <META content="MSHTML 6.00.2900.2802" name=GENERATOR></HEAD> <BODY leftMargin=1 topMargin=1 rightMargin=1><FONT face=Tahoma> <DIV><FONT face=Arial size=2>Bryan-</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2></FONT></DIV> <DIV><FONT face=Arial size=2>Ahh - I think there me be some misunderstanding as to what the thread local variable is doing...</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>In the original code, it looks like you are setting the value of the thread local value to Thread.currentThread() - the changes I proposed changed this (this what actually the most significant change I suggested - the renaming was just fluff - but the following change is critical):</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2><FONT face=Tahoma> synchronized public static void setCurrentTx(Tx contextTx) {<BR> if (threadLocalTx.get() != null) {<BR> throw new IllegalStateException("Tx already assigned for this thread");<BR> }<BR> threadLocalTx.<FONT color=#ff0000><STRONG>set(contextTx);</STRONG></FONT><BR> }</FONT><BR> <DIV><FONT size=2></FONT> </DIV> <DIV>Note that I'm setting the thread local variable as the lock context, not the current thread.</DIV> <DIV> </DIV> <DIV> </DIV> <DIV>To understand why I made this change, I think it may help to chat about how thread local storage actually works (if you already have a solid understanding on this, please excuse me and move on :-) ):</DIV> <DIV> </DIV> <DIV>A basic thread local storage mechanism is actually a special type of Map (I'll call it a hash table from here on out) that has Thread objects as it's keys. When you go to use a 'thread local' value, what you are really doing is performing the following call aMap.get(Thread.currentThread()). At a conceptual level, this can be very powerful, because it allows you to maintain completely separate variable values between different threads.</DIV> <DIV> </DIV> <DIV>In the case of the locking sub-system, you have the concept of a lock context - all locks obtained or released by a given thread take place in that thread's lock context. At a basic level, you could just use the current Thread object itself as the lock context. In fact, this is what basic Java synchronization does.</DIV> <DIV> </DIV> <DIV>Alex's request was to improve on this by allowing a given lock context to be passed from one thread to another. In the most extreme case, this would require that multiple threads be able to simultaneously perform lock operations under the same context. That would be an absolute bugger to design and program - and, quite frankly, it just isn't needed for most application development. So Alex proposed a restriction on 'portable' lock contexts - namely, that a lock context can be active in at most one thread at the same time.</DIV> <DIV> </DIV> <DIV>The use of a thread local variable to manage the active lock context of the current thread is a very elegant solution, and it works something like this:</DIV> <DIV> </DIV> <DIV>When a thread aquires a lock context, it must meet the following conditions:</DIV> <DIV> </DIV> <DIV>1. That thread must not already have a lock context</DIV> <DIV>2. That lock context must not already belong to another thread</DIV> <DIV> </DIV> <DIV>If both of those conditions are met, then the system has to do some book keeping to make sure that no other threads can violate those requirements. So, we do the following:</DIV> <DIV> </DIV> <DIV>1. We set the thread local current lock context to the lock context the thread is aquiring</DIV> <DIV>2. We mark the lock context as belonging the current thread</DIV> <DIV> </DIV> <DIV>From that point forward, all locking operations performed in that thread will occur in the lock context assigned to that thread - until the thread releases the lock context, at which point, it is free to set a different lock context. Note that all locks continue to be held, even if a lock context is not actually owned by a thread...</DIV> <DIV> </DIV> <DIV> </DIV> <DIV>Now, there may be a clever way of ensuring #2 without having to explicitly set the current thread in the lock context object - doing so would completely encapsulate the locking sub-system. I'd like to hear if Alex has any ideas on that, because something in the back of my mind is telling me that this condition may be met implicitly by meeting #1...</DIV> <DIV> </DIV> <DIV>Otherwise, we are talking about a weak hashmap for tracking lock context -> thread mapping - that's certainly doable, but it would be nice if there was a more elgegant solution.</DIV> <DIV> </DIV> <DIV>- K</DIV> <DIV> </DIV> <DIV> </DIV> <DIV> </DIV> <DIV> </DIV></FONT></DIV> <DIV><FONT face=Arial size=2> </FONT> <TABLE> <TBODY> <TR> <TD width=1 bgColor=blue><FONT face=Arial size=2></FONT></TD> <TD><FONT face=Arial size=2><FONT color=red>> Kevin,<BR><BR>I sent an earlier email that is hung up waiting on approval in which I<BR>agreed with you. However, I am now looking at the code [1] and I am much<BR>less sure. This may be simply that I am unfamiliar with the use of<BR>ThreadLocal variables, but the point is to assign on the transaction object<BR>the reference to the thread which is allowed to invoke methods on the<BR>transaction object API, right? That is exactly what I believe the code is<BR>doing, and I believe that the field and method names reflect this.<BR><BR>The inner Tx class begins around line# 247. Please take a look at this<BR>again. If this is not what we discussed, then I do not think that I<BR>understand how to apply the thread local variable properly.<BR><BR>With reference to Queue.lock() [line# 377] and Queue.unlock() [line# 422],<BR>there are comments in the code that the transaction object needs to be<BR>passed into each of these methods. I am working on an update to the 2PL<BR>module that will support this.<BR><BR>Thanks,<BR><BR>-bryan<BR><BR>[1]<BR><A href="http://proto.cognitiveweb.org/projects/cweb/multiproject/cweb-concurrent/xre"><FONT color=#0000ff>http://proto.cognitiveweb.org/projects/cweb/multiproject/cweb-concurrent/xre</FONT></A><BR>f/org/CognitiveWeb/concurrent/locking/DefaultTransactionManager.html<BR><BR>________________________________________<BR>From: <A href="mailto:jdb...@li..."><FONT color=#0000ff>jdb...@li...</FONT></A><BR><A href="mailto:jdb...@li..."><FONT color=#0000ff>[mailto:jdb...@li...]</FONT></A> On Behalf Of Kevin Day <BR>Sent: Thursday, March 30, 2006 8:02 PM<BR>To: JDBM Developer listserv<BR>Subject: [Jdbm-developer] re: DefaultTransactionManager - draft of<BR>prototype.<BR><BR>Bryan-<BR> <BR>I've done a cursory glance through this code (this will be a heck of a lot<BR>easier when we have svn access!)...<BR> <BR>The only thing that stands out that I feel needs comments is the<BR>currentThread variable... I think that you want this to be threadLocalTx.<BR> <BR>So, the call to getCurrentThread and setCurrentThread will actually morph<BR>into getCurrentTx and setCurrentTx, etc..., as in:<BR> <BR> <BR> synchronized public static Tx getCurrentTx() {<BR> return (Tx) threadLocalTx.get();<BR> }<BR> <BR> synchronized public static void setCurrentTx(Tx contextTx) {<BR> if (threadLocalTx.get() != null) {<BR> throw new IllegalStateException("Tx already assigned for<BR>this thread");<BR> }<BR> threadLocalTx.set(contextTx);<BR> }<BR> /**<BR> * Throw exception if the specified lock context is not bound to the<BR>current thread.<BR> */<BR> synchronized public static void assertCurrentTx() {<BR> if (threadLocalLockContext.get() != this) {<BR> throw new IllegalStateException("Tx context not bound to<BR>this thread");<BR> }<BR> }<BR> <BR> <BR>I didn't see an implementation for the Queue class itself, but I suspect<BR>that the Queue.lock() method will wind up accepting a lock context object as<BR>a parameter. In jdbm, this lock context will be the transaction object<BR>returned by getCurrentTx().<BR> <BR>Or am I maybe missing something important about how the Queue class<BR>operates? It seems to me that the items on the queue must somehow be<BR>associated with the transaction that holds the lock (or is waiting for the<BR>lock)...<BR> <BR>- K<BR> <BR> <BR> <BR><BR>> All,<BR><BR>Here is some code based on our discussions that attempts to integrate<BR>the 2PL support with a transaction factory and support the binding of<BR>threads to transactions. Please take a look. I have made some notes<BR>where the code is incomplete, requires features not available, etc.<BR><BR>I am not sure where this is going right now. I could see how this<BR>could be re-factored into an extensible mechanism for creating<BR>transactions and making use of the 2PL facilities, but I am not sure<BR>that this much of the control structure belongs in a generic purpose<BR>2PL locking package rather than in some jdbm specific transaction<BR>management classes.<BR><BR>-bryan<BR><BR>package org.CognitiveWeb.concurrent.locking;<BR><BR>import java.util.HashMap;<BR>import java.util.HashSet;<BR>import java.util.Iterator;<BR>import java.util.Map;<BR>import java.util.Set;<BR><BR>/**<BR>* A factory for transaction objects supporting assignment of threads to<BR>* transactions. A thread may assign a transaction to itself iff the<BR>transaction<BR>* is not currently bound to a thread. If a lock request would result in a<BR>* deadlock, then a {@link DeadlockException} is thrown. If a lock request<BR>* blocks, then a transaction will remain bound to that thread until the<BR>lock<BR>* request is granted, a timeout occurs, or the transaction is aborted. When<BR>a<BR>* thread is running, it may release the bound transaction. A transaction<BR>that<BR>* is not associated with any thread is not running. If other transactions<BR>are<BR>* waiting on a transaction that is not running, then those transactions<BR>will<BR>* block until the transaction is rebound to another thread and it releases<BR>its<BR>* locks. When a transaction completes (either aborts or commits), all locks<BR>* associated with that transaction are released.<BR>* <BR>* @author <a <A href="mailto:Bryan">href="mailto:tho...@us...">Bryan"><FONT color=#0000ff>Bryan">href="mailto:tho...@us...">Bryan</FONT></A><BR>Thompson</a><BR>* @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46<BR>thompsonbry Exp $<BR>*/<BR><BR>public class DefaultTransactionManager<BR>{<BR> <BR> /**<BR> * The maximum multi-programming level.<BR> */<BR> private final int capacity;<BR> <BR> /**<BR> * The directed graph of the WAITS_FOR relation which is used to<BR>detect<BR> * deadlocks.<BR> */<BR> protected final TxDag waitsFor;<BR><BR> /**<BR> * The active transaction objects.<BR> */<BR> private final Set transactions;<BR><BR> /**<BR> * The queue for each in-use resource.<BR> * <BR> * FIXME This must be a weak value hash map so that queues may be<BR>garbage<BR> * collected once they are no longer in use.<BR> */<BR> private final Map queues = new HashMap();<BR> <BR> /**<BR> * The maximum multi-programming level (from the constructor).<BR> */<BR> public int capacity() {<BR> return capacity;<BR> }<BR><BR> /**<BR> * The current multi-programming level.<BR> */<BR> public int size() {<BR> return transactions.size();<BR> }<BR> <BR> /**<BR> * Create a transaction manager.<BR> * <BR> * @param capacity<BR> * The maximum multiprogramming level (aka the maximum<BR>#of<BR> * concurrent transactions).<BR> */<BR> <BR> public DefaultTransactionManager( int capacity )<BR> {<BR> <BR> this.capacity = capacity;<BR> <BR> waitsFor = new TxDag( capacity );<BR><BR> transactions = new HashSet( capacity );<BR> <BR> }<BR><BR> /**<BR> * Create a new transaction. By default the transaction will be<BR>bound to the<BR> * thread in which this request was made. If the maximum<BR>multi-programming<BR> * level would be exceeded, then this request will block until the<BR> * concurrency level has decreased.<BR> * <BR> * @exception IllegalStateException<BR> * If the thread is already bound to another<BR>transaction.<BR> * <BR> * @return The transaction.<BR> */<BR> <BR> public Tx createTx()<BR> {<BR> <BR> return createTx( 0L, 0 );<BR><BR> }<BR> <BR> /**<BR> * Create a new transaction. By default the transaction will be<BR>bound to the<BR> * thread in which this request was made. If the maximum<BR>multi-programming<BR> * level would be exceeded, then this request will block until the<BR> * concurrency level has decreased.<BR> * <BR> * @param millis<BR> * <BR> * @param nanos<BR> * <BR> * @exception IllegalStateException<BR> * If the thread is already bound to another<BR>transaction.<BR> * @exception TimeoutException<BR> * If the transaction could not be created in the<BR>specified<BR> * time.<BR> * <BR> * @return The transaction.<BR> */<BR> <BR> synchronized public Tx createTx( long millis, int nanos )<BR> {<BR> <BR> // @todo enforce multi-programming limit. e.g., using<BR>java.util.concurrent.Queue.<BR> <BR> Tx tx = new Tx( this );<BR> <BR> transactions.add( tx );<BR> <BR> return tx;<BR> <BR> }<BR><BR> /**<BR> * Return the {@link Queue} for the identified resource.<BR> * <p><BR> * Note: The {@link Queue}s are maintained in a weak value hash map<BR>whose<BR> * keys are the resource identified. Each transaction must hold a<BR>hard<BR> * reference to each queue for which it requests a lock and must<BR>clear that<BR> * hard reference once it releases its lock. Once there are no more<BR> * references to a given {@link Queue}, the garbage collector will<BR>reclaim<BR> * the queue. This is important since the number of queue objects is<BR>bounded<BR> * by the number of distinct resources in the database and could<BR>otherwise<BR> * easily exceed the available memory.<BR> * <BR> * @param resource<BR> * The resource identified.<BR> * <BR> * @param insert<BR> * When true a new {@link Queue} will be created iff none<BR>exists<BR> * for that resource.<BR> * <BR> * @return The queue for that resource or <code>null</code> iff<BR> * <code> insert == false </code> and there is no queue for<BR>that<BR> * resource.<BR> */<BR> <BR> synchronized protected Queue getQueue( Object resource, boolean<BR>insert )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> <BR> Queue queue = (Queue) queues.get(resource);<BR> <BR> if( queue == null && insert ) {<BR> <BR> queue = new Queue(waitsFor,resource);<BR> <BR> queues.put( resource, queue );<BR> <BR> }<BR> <BR> return queue;<BR> <BR> }<BR> <BR> /**<BR> * A transaction object. A transaction object may only be used by<BR>the thread<BR> * to which it is currently bound. If the transaction is not bound<BR>to any<BR> * thread, then it must be bound to a thread before any other<BR>operations may<BR> * be taken. A transaction which is not bound to a thread is not<BR>running and<BR> * any transactions waiting on that transaction can not run the<BR>transaction<BR> * has been bound to a thread and its locks have been released.<BR> * <BR> * @author <a<BR><A href="mailto:Bryan">href="mailto:tho...@us...">Bryan"><FONT color=#0000ff>Bryan">href="mailto:tho...@us...">Bryan</FONT></A><BR>Thompson</a><BR> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29<BR>14:41:46 thompsonbry Exp $<BR> * <BR> * @todo Define a transaction identifier, expose it, and use it as<BR>the key<BR> * in {@link DefaultTransactionManager#transactions}?<BR> */<BR><BR> public static class Tx {<BR><BR> /**<BR> * The transaction manager.<BR> */<BR> private final DefaultTransactionManager txMgr;<BR><BR> /**<BR> * The {@link Queue} for each lock requested by this<BR>transaction. The<BR> * queue is added to this collection before the lock request<BR>is made and<BR> * is cleared from the collection once the lock has either<BR>been released<BR> * or when a deadlock or timeout exception was thrown at the<BR>time that<BR> * the lock was requested.<BR> */<BR> private final Set locks = new HashSet();<BR> <BR> /**<BR> * The transaction manager.<BR> */<BR><BR> public DefaultTransactionManager getTranasctionManager() {<BR> return txMgr;<BR> }<BR><BR> /**<BR> * Create a new transaction object.<BR> */<BR> protected Tx( DefaultTransactionManager txMgr ) {<BR> if( txMgr == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> this.txMgr = txMgr;<BR> setCurrentThread();<BR> }<BR> <BR> /**<BR> * A thread local variable whose value is either the thread<BR>to which the<BR> * transaction is currently bound or <code>null</code> iff<BR>the<BR> * transaction is not currently bound to any thread.<BR> * <BR> * @todo Review synchronization requirements for methods<BR>that access and<BR> * set this thread local variable.<BR> */<BR> private static ThreadLocal currentThread = new ThreadLocal()<BR>{<BR> protected synchronized Object initialValue() {<BR> return null;<BR> }<BR> };<BR><BR> /**<BR> * Return the thread to which this transaction is currently<BR>bound or<BR> * <code>null</code> iff the transaction is not bound to any<BR> * thread.<BR> */<BR> synchronized public static Thread getCurrentThread() {<BR> return (Thread) currentThread.get();<BR> }<BR><BR> /**<BR> * Release the transaction from the thread of the current<BR>context.<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction is not bound to the<BR>thread of the<BR> * current context.<BR> */<BR> synchronized public static void releaseCurrentThread() {<BR> assertCurrentThread();<BR> currentThread.set(null);<BR> }<BR><BR> /**<BR> * Bind the transaction to the thread of the current<BR>context.<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction is already bound to a<BR>thread.<BR> */<BR> synchronized public static void setCurrentThread() {<BR> if (currentThread.get() != null) {<BR> throw new IllegalStateException("thread<BR>already assigned");<BR> }<BR> currentThread.set(Thread.currentThread());<BR> }<BR><BR> /**<BR> * Throw exception if the {@link Tx} is not bound to the<BR>thread of<BR> * the current context.<BR> */<BR> synchronized public static void assertCurrentThread() {<BR> if (currentThread.get() != Thread.currentThread()) {<BR> throw new IllegalStateException(<BR> "transaction not bound to<BR>this thread");<BR> }<BR> }<BR><BR> /**<BR> * Request a lock on the identified resource in the<BR>specified mode. This<BR> * method will block until the lock can be granted.<BR> * <BR> * @param resource<BR> * The resource identifier.<BR> * <BR> * @param mode<BR> * The mode - see {@link LockMode).<BR> * <BR> * @exception DeadlockException<BR> * if the lock request would cause a<BR>deadlock.<BR> * <BR> * @todo add timeout variant of this method.<BR> * @todo add non-blocking variant of this method.<BR> */<BR> <BR> public void lock( final Object resource, final short mode )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR><BR> assertCurrentThread();<BR><BR> final Queue queue = txMgr.getQueue( resource, true<BR>);<BR> <BR> /*<BR> * Note: If locks are reentrant, then we need to be<BR>careful to only<BR> * clear the Queue reference from [locks] once the<BR>transaction has<BR> * reduced the lock counter to zero for that queue.<BR> */ <BR> final boolean exists = locks.add( queue );<BR> <BR> try {<BR> queue.lock( mode ); // @todo lock( this,<BR>mode );<BR> }<BR> catch( DeadlockException t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw t;<BR> }<BR> catch( TimeoutException t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw t;<BR> }<BR> catch( Throwable t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw new RuntimeException( t );<BR> }<BR> <BR> }<BR><BR> /**<BR> * Release the lock on the identified resource.<BR> * <BR> * @param resource<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction does not have a lock on<BR>that<BR> * resource.<BR> */<BR> <BR> public void unlock( Object resource )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> <BR> assertCurrentThread();<BR> <BR> Queue queue = txMgr.getQueue( resource, true );<BR> <BR> if( queue == null ) {<BR> <BR> throw new IllegalStateException(<BR> "no queue for resource:<BR>resource=" + resource);<BR> <BR> }<BR><BR> queue.unlock(); // @todo unlock( this );<BR><BR><BR> if( ! locks.remove( queue ) ) {<BR> <BR> throw new AssertionError();<BR> <BR> }<BR> <BR> }<BR> <BR> /**<BR> * This method MUST be invoked to signal that the<BR>transaction has<BR> * completed its processing and is responsible for releasing<BR>all locks<BR> * and other resources associated with the transaction. This<BR>method MUST<BR> * be used whether the transaction was successful or<BR>aborted. This method<BR> * also releases the bond between the transaction and the<BR>thread which is<BR> * executing that transaction.<BR> */<BR> <BR> public void complete()<BR> {<BR><BR> /*<BR> * @todo Is it always possible to call this method,<BR>e.g., on a tx<BR> * abort, from the thread associated with the tx?<BR>Perhaps we need to<BR> * allow this method to be called by any thread and<BR>always have it<BR> * release the tx - thread bond.<BR> */<BR> assertCurrentThread();<BR> <BR> /*<BR> * @todo In order to optimize lock release behavior<BR>both this method<BR> * and Queue should probably synchronize on<BR>waitsFor. This would<BR> * allow us to use a unlock() variant that did not<BR>update the<BR> * waitsFor graph to release each of the locks and<BR>then use the<BR> * optimized methods to clear the row and column for<BR>this<BR> * transaction from the waitsFor graph.<BR> * <BR> * The code below just steps through all of the<BR>locks that are still<BR> * held by this transaction releasing them one at a<BR>time and<BR> * updating the waitsFor graph each time a lock is<BR>released.<BR> * <BR> * @todo If we have re-entrant locks then this code<BR>will not work<BR> * since the application will have to release the<BR>locks the right<BR> * #of times. I really do not see the point of<BR>re-entrant locks,<BR> * certainly not if they are associated with a lock<BR>count as opposed<BR> * to simply returning immedately if the lock is<BR>already held by the<BR> * transaction.<BR> */<BR> <BR> Iterator itr = locks.iterator();<BR> <BR> while( itr.hasNext() ) {<BR> <BR> Queue queue = (Queue) itr.next();<BR> <BR> queue.unlock(); // @todo unlock( this );<BR><BR><BR> itr.remove();<BR> <BR> }<BR><BR> releaseCurrentThread();<BR> <BR> }<BR> <BR> } // class Tx.<BR> <BR>}<BR><BR><<BR><BR><BR><BR>-------------------------------------------------------<BR>This SF.Net email is sponsored by xPML, a groundbreaking scripting language<BR>that extends applications into web and mobile media. Attend the live webcast<BR>and join the prime developer group breaking into this new coding territory!<BR><A href="http://sel.as-us.falkag.net/sel?cmd"><FONT color=#0000ff>http://sel.as-us.falkag.net/sel?cmd</FONT></A><BR><BR><<BR></FONT></FONT></TD></TR></TBODY></TABLE></DIV></FONT></BODY></HTML> |