From: Thompson, B. B. <BRY...@sa...> - 2006-04-03 12:28:34
|
Resend as plain text to reduce message size. -bryan ________________________________________ From: Thompson, Bryan B.=20 Sent: Monday, April 03, 2006 8:19 AM To: 'Kevin Day'; JDBM Developer listserv Subject: RE: re[2]: [Jdbm-developer] re[2]: DefaultTransactionManager - draft of prototype. Kevin, No problem.=A0 I will update the 2PL module along the lines that you = described and we can reconcile later.=A0 It seems to me that the transaction API = for jdbm2 will need to abstract all of this away from the underlying record manager (so that we can have more than one design) and away from the concurrency control logic (so that we can have more than one approach). I am curious whether the concept of a "lock context" in the 2PL module = makes more or less sense than just calling this a "transaction".=A0 Locking = clearly supports only one property for transactions (isolation), but the notion = of lock context may be too far removed from "transaction".=A0 What do you = think?=A0 This will just effect the parameter names and javadoc, not any code. -bryan ________________________________________ From: jdb...@li... [mailto:jdb...@li...] On Behalf Of Kevin = Day Sent: Saturday, April 01, 2006 4:12 PM To: JDBM Developer listserv Subject: re[2]: [Jdbm-developer] re[2]: DefaultTransactionManager - = draft of prototype. Bryan- =A0 I need to implement something along these lines in the transaction = manager anyway (I already have the proof of concept for that put together), so = I am going to implement the thread local thing at that level.=A0 That = approach fixes a number of dependency issues that I've been unhappy with in the implementation, so I'm going to give it a crack. =A0 Once I have that code written (and we have a place to share it :-)=A0 = ), I think that it will become evident why Alex suggested this approach.=A0 = Now, whether this has any place in the locking subsystem or not, I can't = really say.=A0 For the time being, let's just move forward assuming that the = calls to lock() and unlock() will all have a parameter that describes the = context that the lock is being made in.=A0 I think that this lock context is = going to be more important in the deadlock detection arena than the actual = locking arena (although it will, I suspect, be important for properly handling simple re-entrancy and lock upgrades). =A0 So, I'm going to implement assuming a lock manager interface (I'll = knock something together), then we can hash out the differences between what = the transaction manager needs, and what can be provided by your current = design. =A0 Sound good? =A0 Cheers! =A0 - K =A0=20 >=20 Kevin,=20 =A0 I am still not quite getting it.=20 =A0 The constraint that I am seeking to enforce is that at most one thread = may do work on (invoke methods on the API of) a transaction at a time. = =A0This constraint is achieved by binding a thread and a transaction. =A0 =A0 I realize that I must be misusing the thread local variable since = having on the tx makes no sense =A0that should be a normal private field, in = which case it would enforce the constraint that only the thread whose reference = was set on that field could invoke methods on the tx. =A0Another thread could = obtain the tx only iff the tx had first been released. =A0So I am not sure = that I understand what the thread local variable was supposed to be used for = since it looks like the semantics can be obtained directly from a normal = field. =A0 =A0 If a thread local variable is shared by all access executing in the = same thread, then I am not clear how this would be applied to create the constraint that at most one thread at a time may do work on a tx.=20 =A0 Perhaps it would be easier and more robust to make the queue / lock = smarter so that it tracked internally the thread that a transaction was bound = to during a lock request. =A0E.g., using a hash map based on the = transaction object and checking for the existence of an entry for that transaction. = =A0The value of the entry would be the thread in which the tx was requesting a = lock (or blocked while requesting a lock). =A0At that point multiple threads = could execute operations on the same transaction interface, but blocking in = any thread would block all threads for that transaction. =A0Requests by = multiple threads for the same lock would need to be coalesced within the queue, = but all blocked threads would need to be notified.=20 =A0 -bryan=20 =A0 From: jdb...@li... [mailto:jdb...@li...] On Behalf Of Kevin = Day=20 Sent: Thursday, March 30, 2006 8:57 PM To: JDBM Developer listserv Subject: [Jdbm-developer] re[2]: DefaultTransactionManager - draft of prototype.=20 =A0 Bryan-=20 =A0 Just had a Homer Simpson 'dooh' realization on the way home. =A0The Queue.lock() method doesn't need to know anything about the current transaction - if lock() blocks, then there is an implicit 1:1 = relationship between the current thread and the transaction. =A0If lock() doesn't = block, then the transaction itself is responsible for freeing it's own locks. = =A0In neither scenario is having the transaction object itself necessary. = =A0That means that asserting the current transaction is sufficient to guarantee proper operation, even with suspending and resuming a transaction.=20 =A0 We do still need to be able to obtain the transaction object from = thread local storage for other purposes though (interaction with the version manager, for example), so my suggestions still stand.=20 =A0 Anyway - I thought I'd better write this now so you don't think I'm a = total idiot when you get my first email :-)=20 =A0 - K=20 =A0 =A0 Bryan-=20 =A0 I've done a cursory glance through this code (this will be a heck of a = lot easier when we have svn access!)...=20 =A0 The only thing that stands out that I feel needs comments is the currentThread variable... =A0I think that you want this to be = threadLocalTx.=20 =A0 So, the call to getCurrentThread and setCurrentThread will actually = morph into getCurrentTx and setCurrentTx, etc..., as in:=20 =A0 =A0 =A0=A0=A0=A0=A0=A0=A0synchronized public static Tx getCurrentTx() {=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return (Tx) threadLocalTx.get(); =A0=A0=A0=A0=A0=A0=A0=A0}=20 =A0 =A0=A0=A0=A0=A0=A0=A0synchronized public static void setCurrentTx(Tx = contextTx) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (threadLocalTx.get() !=3D null) = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException("Tx already assigned for this thread"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0threadLocalTx.set(contextTx); =A0=A0=A0=A0=A0=A0=A0=A0}=20 =A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Throw exception if the specified lock = context is not bound to the current thread.=20 =A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = assertCurrentTx() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (threadLocalLockContext.get() = !=3D this) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException("Tx context not bound to this thread"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0}=20 =A0 =A0 I didn't see an implementation for the Queue class itself, but I = suspect that the Queue.lock() method will wind up accepting a lock context = object as a parameter. =A0In jdbm, this lock context will be the transaction = object returned by getCurrentTx().=20 =A0 Or am I maybe missing something important about how the Queue class operates? =A0It seems to me that the items on the queue must somehow be associated with the transaction that holds the lock (or is waiting for = the lock)...=20 =A0 - K=20 =A0 =A0 =A0=A0 =A0 > All, Here is some code based on our discussions that attempts to integrate the 2PL support with a transaction factory and support the binding of threads to transactions. =A0Please take a look. =A0I have made some = notes where the code is incomplete, requires features not available, etc. I am not sure where this is going right now. =A0I could see how this could be re-factored into an extensible mechanism for creating transactions and making use of the 2PL facilities, but I am not sure that this much of the control structure belongs in a generic purpose 2PL locking package rather than in some jdbm specific transaction management classes. -bryan package org.CognitiveWeb.concurrent.locking; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * A factory for transaction objects supporting assignment of threads to * transactions. A thread may assign a transaction to itself iff the transaction * is not currently bound to a thread. If a lock request would result in = a * deadlock, then a {@link DeadlockException} is thrown. If a lock = request * blocks, then a transaction will remain bound to that thread until the lock * request is granted, a timeout occurs, or the transaction is aborted. = When a * thread is running, it may release the bound transaction. A = transaction that * is not associated with any thread is not running. If other = transactions are * waiting on a transaction that is not running, then those transactions will * block until the transaction is rebound to another thread and it = releases its * locks. When a transaction completes (either aborts or commits), all = locks * associated with that transaction are released. *=20 * @author <a href=3D"mailto:tho...@us...">Bryan">Bryan">href=3D"= mailto:t hom...@us...">Bryan Thompson</a> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 = 14:41:46 thompsonbry Exp $ */ public class DefaultTransactionManager { =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The maximum multi-programming level. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0private final int capacity; =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The directed graph of the WAITS_FOR relation which is = used to detect =A0=A0=A0=A0=A0* deadlocks. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0protected final TxDag waitsFor; =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The active transaction objects. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0private final Set transactions; =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The queue for each in-use resource. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* FIXME This must be a weak value hash map so that = queues may be garbage =A0=A0=A0=A0=A0* collected once they are no longer in use. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0private final Map queues =3D new HashMap(); =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The maximum multi-programming level (from the = constructor). =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0public int capacity() { =A0=A0=A0=A0=A0=A0=A0=A0return capacity; =A0=A0=A0=A0} =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* The current multi-programming level. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0public int size() { =A0=A0=A0=A0=A0=A0=A0=A0return transactions.size(); =A0=A0=A0=A0} =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Create a transaction manager. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param capacity =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The maximum = multiprogramming level (aka the maximum #of =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0concurrent = transactions). =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0public DefaultTransactionManager( int capacity ) =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0this.capacity =3D capacity; =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0waitsFor =3D new TxDag( capacity ); =A0=A0=A0=A0=A0=A0=A0=A0transactions =3D new HashSet( capacity ); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Create a new transaction. By default the transaction = will be bound to the =A0=A0=A0=A0=A0* thread in which this request was made. If the maximum multi-programming =A0=A0=A0=A0=A0* level would be exceeded, then this request will block = until the =A0=A0=A0=A0=A0* concurrency level has decreased. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0If the = thread is already bound to another transaction. =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0* @return The transaction. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0public Tx createTx() =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0return createTx( 0L, 0 ); =A0=A0=A0=A0} =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Create a new transaction. By default the transaction = will be bound to the =A0=A0=A0=A0=A0* thread in which this request was made. If the maximum multi-programming =A0=A0=A0=A0=A0* level would be exceeded, then this request will block = until the =A0=A0=A0=A0=A0* concurrency level has decreased. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param millis =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param nanos =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0If the = thread is already bound to another transaction. =A0=A0=A0=A0=A0* @exception TimeoutException =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0If the = transaction could not be created in the specified =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0time. =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0* @return The transaction. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0synchronized public Tx createTx( long millis, int nanos ) =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0// @todo enforce multi-programming limit. = =A0e.g., using java.util.concurrent.Queue. =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0Tx tx =3D new Tx( this ); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0transactions.add( tx ); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0return tx; =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* Return the {@link Queue} for the identified resource. =A0=A0=A0=A0=A0* <p> =A0=A0=A0=A0=A0* Note: The {@link Queue}s are maintained in a weak = value hash map whose =A0=A0=A0=A0=A0* keys are the resource identified. Each transaction = must hold a hard =A0=A0=A0=A0=A0* reference to each queue for which it requests a lock = and must clear that =A0=A0=A0=A0=A0* hard reference once it releases its lock. Once there = are no more =A0=A0=A0=A0=A0* references to a given {@link Queue}, the garbage = collector will reclaim =A0=A0=A0=A0=A0* the queue. This is important since the number of queue = objects is bounded =A0=A0=A0=A0=A0* by the number of distinct resources in the database = and could otherwise =A0=A0=A0=A0=A0* easily exceed the available memory. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param resource =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The resource = identified. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @param insert =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0When true a new = {@link Queue} will be created iff none exists =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0for that resource. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @return The queue for that resource or = <code>null</code> iff =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0<code> insert =3D=3D false = </code> and there is no queue for that =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0resource. =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0 =A0=A0=A0=A0synchronized protected Queue getQueue( Object resource, = boolean insert ) =A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0if( resource =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0Queue queue =3D (Queue) queues.get(resource); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0if( queue =3D=3D null && insert ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue =3D new = Queue(waitsFor,resource); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queues.put( resource, queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0return queue; =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} =A0=A0=A0=A0 =A0=A0=A0=A0/** =A0=A0=A0=A0=A0* A transaction object. A transaction object may only be = used by the thread =A0=A0=A0=A0=A0* to which it is currently bound. If the transaction is = not bound to any =A0=A0=A0=A0=A0* thread, then it must be bound to a thread before any = other operations may =A0=A0=A0=A0=A0* be taken. A transaction which is not bound to a thread = is not running and =A0=A0=A0=A0=A0* any transactions waiting on that transaction can not = run the transaction =A0=A0=A0=A0=A0* has been bound to a thread and its locks have been = released. =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @author <a href=3D"mailto:tho...@us...">Bryan">Bryan">href=3D"= mailto:t hom...@us...">Bryan Thompson</a> =A0=A0=A0=A0=A0* @version $Id: DefaultTransactionManager.java,v 1.1 = 2006/03/29 14:41:46 thompsonbry Exp $ =A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0* @todo Define a transaction identifier, expose it, and = use it as the key =A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0in {@link = DefaultTransactionManager#transactions}? =A0=A0=A0=A0=A0*/ =A0=A0=A0=A0public static class Tx { =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* The transaction manager. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0private final DefaultTransactionManager txMgr; =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* The {@link Queue} for each lock requested = by this transaction. The =A0=A0=A0=A0=A0=A0=A0=A0=A0* queue is added to this collection before = the lock request is made and =A0=A0=A0=A0=A0=A0=A0=A0=A0* is cleared from the collection once the = lock has either been released =A0=A0=A0=A0=A0=A0=A0=A0=A0* or when a deadlock or timeout exception = was thrown at the time that =A0=A0=A0=A0=A0=A0=A0=A0=A0* the lock was requested. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0private final Set locks =3D new HashSet(); =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* The transaction manager. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0public DefaultTransactionManager = getTranasctionManager() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return txMgr; =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Create a new transaction object. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0protected Tx( DefaultTransactionManager txMgr ) = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( txMgr =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0this.txMgr =3D txMgr; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0setCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* A thread local variable whose value is = either the thread to which the =A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction is currently bound or = <code>null</code> iff the =A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction is not currently bound to any = thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo Review synchronization requirements = for methods that access and =A0=A0=A0=A0=A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0set this thread local = variable. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0private static ThreadLocal currentThread =3D = new ThreadLocal() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0protected synchronized Object = initialValue() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return null; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0}; =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Return the thread to which this = transaction is currently bound or =A0=A0=A0=A0=A0=A0=A0=A0=A0* <code>null</code> iff the transaction is = not bound to any =A0=A0=A0=A0=A0=A0=A0=A0=A0* thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static Thread = getCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0return (Thread) = currentThread.get(); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Release the transaction from the thread of = the current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the transaction is not = bound to the thread of the =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = releaseCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0currentThread.set(null); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Bind the transaction to the thread of the = current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the transaction is = already bound to a thread. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = setCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (currentThread.get() !=3D null) = { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException("thread already assigned"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0currentThread.set(Thread.currentThre= ad()); =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Throw exception if the {@link Tx} is not = bound to the thread of =A0=A0=A0=A0=A0=A0=A0=A0=A0* the current context. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0synchronized public static void = assertCurrentThread() { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if (currentThread.get() !=3D = Thread.currentThread()) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException( =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= "transaction not bound to this thread"); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Request a lock on the identified resource = in the specified mode. This =A0=A0=A0=A0=A0=A0=A0=A0=A0* method will block until the lock can be = granted. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @param resource =A0=A0=A0=A0=A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The = resource identifier. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @param mode =A0=A0=A0=A0=A0=A0=A0=A0=A0* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0The mode = - see {@link LockMode). =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception DeadlockException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the lock request would = cause a deadlock. =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo add timeout variant of this method. =A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo add non-blocking variant of this = method. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0public void lock( final Object resource, final = short mode ) =A0=A0=A0=A0=A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( resource =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0final Queue queue =3D = txMgr.getQueue( resource, true ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0/* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* Note: If locks are reentrant, = then we need to be careful to only =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* clear the Queue reference from = [locks] once the transaction has =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* reduced the lock counter to = zero for that queue. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*/=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0final boolean exists =3D locks.add( = queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0try { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue.lock( mode ); // = @todo lock( this, mode ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0catch( DeadlockException t ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! exists ) = locks.remove( queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw t; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0catch( TimeoutException t ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! exists ) = locks.remove( queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw t; =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0catch( Throwable t ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! exists ) = locks.remove( queue ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = RuntimeException( t ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* Release the lock on the identified = resource. =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @param resource =A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0* @exception IllegalStateException =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if the transaction does = not have a lock on that =A0=A0=A0=A0=A0=A0=A0=A0=A0* = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0resource. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0public void unlock( Object resource ) =A0=A0=A0=A0=A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( resource =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalArgumentException(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0Queue queue =3D txMgr.getQueue( = resource, true ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( queue =3D=3D null ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = IllegalStateException( =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= "no queue for resource: resource=3D" + resource); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue.unlock(); // @todo unlock( = this ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0if( ! locks.remove( queue ) ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0throw new = AssertionError(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0/** =A0=A0=A0=A0=A0=A0=A0=A0=A0* This method MUST be invoked to signal that = the transaction has =A0=A0=A0=A0=A0=A0=A0=A0=A0* completed its processing and is = responsible for releasing all locks =A0=A0=A0=A0=A0=A0=A0=A0=A0* and other resources associated with the = transaction. This method MUST =A0=A0=A0=A0=A0=A0=A0=A0=A0* be used whether the transaction was = successful or aborted. =A0This method =A0=A0=A0=A0=A0=A0=A0=A0=A0* also releases the bond between the = transaction and the thread which is =A0=A0=A0=A0=A0=A0=A0=A0=A0* executing that transaction. =A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0public void complete() =A0=A0=A0=A0=A0=A0=A0=A0{ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0/* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo Is it always possible to = call this method, e.g., on a tx =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* abort, from the thread = associated with the tx? Perhaps we need to =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* allow this method to be called = by any thread and always have it =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* release the tx - thread bond. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0assertCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0/* =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo In order to optimize = lock release behavior both this method =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* and Queue should probably = synchronize on waitsFor. This would =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* allow us to use a unlock() = variant that did not update the =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* waitsFor graph to release each = of the locks and then use the =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* optimized methods to clear the = row and column for this =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction from the waitsFor = graph. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* The code below just steps = through all of the locks that are still =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* held by this transaction = releasing them one at a time and =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* updating the waitsFor graph = each time a lock is released. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*=20 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* @todo If we have re-entrant = locks then this code will not work =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* since the application will = have to release the locks the right =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* #of times. I really do not see = the point of re-entrant locks, =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* certainly not if they are = associated with a lock count as opposed =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* to simply returning immedately = if the lock is already held by the =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0* transaction. =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0*/ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0Iterator itr =3D locks.iterator(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0while( itr.hasNext() ) { =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0Queue queue =3D (Queue) = itr.next(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0queue.unlock(); // = @todo unlock( this ); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0itr.remove(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0releaseCurrentThread(); =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0} =A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0} // class Tx. =A0=A0=A0=A0 } < < |
From: Thompson, B. B. <BRY...@sa...> - 2006-04-03 12:19:49
|
Kevin, No problem. I will update the 2PL module along the lines that you described and we can reconcile later. It seems to me that the transaction API for jdbm2 will need to abstract all of this away from the underlying record manager (so that we can have more than one design) and away from the concurrency control logic (so that we can have more than one approach). I am curious whether the concept of a "lock context" in the 2PL module makes more or less sense than just calling this a "transaction". Locking clearly supports only one property for transactions (isolation), but the notion of lock context may be too far removed from "transaction". What do you think? This will just effect the parameter names and javadoc, not any code. -bryan _____ From: jdb...@li... [mailto:jdb...@li...] On Behalf Of Kevin Day Sent: Saturday, April 01, 2006 4:12 PM To: JDBM Developer listserv Subject: re[2]: [Jdbm-developer] re[2]: DefaultTransactionManager - draft of prototype. Bryan- I need to implement something along these lines in the transaction manager anyway (I already have the proof of concept for that put together), so I am going to implement the thread local thing at that level. That approach fixes a number of dependency issues that I've been unhappy with in the implementation, so I'm going to give it a crack. Once I have that code written (and we have a place to share it :-) ), I think that it will become evident why Alex suggested this approach. Now, whether this has any place in the locking subsystem or not, I can't really say. For the time being, let's just move forward assuming that the calls to lock() and unlock() will all have a parameter that describes the context that the lock is being made in. I think that this lock context is going to be more important in the deadlock detection arena than the actual locking arena (although it will, I suspect, be important for properly handling simple re-entrancy and lock upgrades). So, I'm going to implement assuming a lock manager interface (I'll knock something together), then we can hash out the differences between what the transaction manager needs, and what can be provided by your current design. Sound good? Cheers! - K > Kevin, I am still not quite getting it. The constraint that I am seeking to enforce is that at most one thread may do work on (invoke methods on the API of) a transaction at a time. This constraint is achieved by binding a thread and a transaction. I realize that I must be misusing the thread local variable since having on the tx makes no sense that should be a normal private field, in which case it would enforce the constraint that only the thread whose reference was set on that field could invoke methods on the tx. Another thread could obtain the tx only iff the tx had first been released. So I am not sure that I understand what the thread local variable was supposed to be used for since it looks like the semantics can be obtained directly from a normal field. If a thread local variable is shared by all access executing in the same thread, then I am not clear how this would be applied to create the constraint that at most one thread at a time may do work on a tx. Perhaps it would be easier and more robust to make the queue / lock smarter so that it tracked internally the thread that a transaction was bound to during a lock request. E.g., using a hash map based on the transaction object and checking for the existence of an entry for that transaction. The value of the entry would be the thread in which the tx was requesting a lock (or blocked while requesting a lock). At that point multiple threads could execute operations on the same transaction interface, but blocking in any thread would block all threads for that transaction. Requests by multiple threads for the same lock would need to be coalesced within the queue, but all blocked threads would need to be notified. -bryan From: jdb...@li... <mailto:jdb...@li...> [mailto:jdb...@li...] <mailto:jdb...@li...> On Behalf Of Kevin Day Sent: Thursday, March 30, 2006 8:57 PM To: JDBM Developer listserv Subject: [Jdbm-developer] re[2]: DefaultTransactionManager - draft of prototype. Bryan- Just had a Homer Simpson 'dooh' realization on the way home. The Queue.lock() method doesn't need to know anything about the current transaction - if lock() blocks, then there is an implicit 1:1 relationship between the current thread and the transaction. If lock() doesn't block, then the transaction itself is responsible for freeing it's own locks. In neither scenario is having the transaction object itself necessary. That means that asserting the current transaction is sufficient to guarantee proper operation, even with suspending and resuming a transaction. We do still need to be able to obtain the transaction object from thread local storage for other purposes though (interaction with the version manager, for example), so my suggestions still stand. Anyway - I thought I'd better write this now so you don't think I'm a total idiot when you get my first email :-) - K Bryan- I've done a cursory glance through this code (this will be a heck of a lot easier when we have svn access!)... The only thing that stands out that I feel needs comments is the currentThread variable... I think that you want this to be threadLocalTx. So, the call to getCurrentThread and setCurrentThread will actually morph into getCurrentTx and setCurrentTx, etc..., as in: synchronized public static Tx getCurrentTx() { return (Tx) threadLocalTx.get(); } synchronized public static void setCurrentTx(Tx contextTx) { if (threadLocalTx.get() != null) { throw new IllegalStateException("Tx already assigned for this thread"); } threadLocalTx.set(contextTx); } /** * Throw exception if the specified lock context is not bound to the current thread. */ synchronized public static void assertCurrentTx() { if (threadLocalLockContext.get() != this) { throw new IllegalStateException("Tx context not bound to this thread"); } } I didn't see an implementation for the Queue class itself, but I suspect that the Queue.lock() method will wind up accepting a lock context object as a parameter. In jdbm, this lock context will be the transaction object returned by getCurrentTx(). Or am I maybe missing something important about how the Queue class operates? It seems to me that the items on the queue must somehow be associated with the transaction that holds the lock (or is waiting for the lock)... - K > All, Here is some code based on our discussions that attempts to integrate the 2PL support with a transaction factory and support the binding of threads to transactions. Please take a look. I have made some notes where the code is incomplete, requires features not available, etc. I am not sure where this is going right now. I could see how this could be re-factored into an extensible mechanism for creating transactions and making use of the 2PL facilities, but I am not sure that this much of the control structure belongs in a generic purpose 2PL locking package rather than in some jdbm specific transaction management classes. -bryan package org.CognitiveWeb.concurrent.locking; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * A factory for transaction objects supporting assignment of threads to * transactions. A thread may assign a transaction to itself iff the transaction * is not currently bound to a thread. If a lock request would result in a * deadlock, then a {@link DeadlockException} is thrown. If a lock request * blocks, then a transaction will remain bound to that thread until the lock * request is granted, a timeout occurs, or the transaction is aborted. When a * thread is running, it may release the bound transaction. A transaction that * is not associated with any thread is not running. If other transactions are * waiting on a transaction that is not running, then those transactions will * block until the transaction is rebound to another thread and it releases its * locks. When a transaction completes (either aborts or commits), all locks * associated with that transaction are released. * * @author <a href= <mailto:Bryan> "mailto:tho...@us...">Bryan">Bryan">href="mailto:thomps on...@us...">Bryan Thompson</a> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46 thompsonbry Exp $ */ public class DefaultTransactionManager { /** * The maximum multi-programming level. */ private final int capacity; /** * The directed graph of the WAITS_FOR relation which is used to detect * deadlocks. */ protected final TxDag waitsFor; /** * The active transaction objects. */ private final Set transactions; /** * The queue for each in-use resource. * * FIXME This must be a weak value hash map so that queues may be garbage * collected once they are no longer in use. */ private final Map queues = new HashMap(); /** * The maximum multi-programming level (from the constructor). */ public int capacity() { return capacity; } /** * The current multi-programming level. */ public int size() { return transactions.size(); } /** * Create a transaction manager. * * @param capacity * The maximum multiprogramming level (aka the maximum #of * concurrent transactions). */ public DefaultTransactionManager( int capacity ) { this.capacity = capacity; waitsFor = new TxDag( capacity ); transactions = new HashSet( capacity ); } /** * Create a new transaction. By default the transaction will be bound to the * thread in which this request was made. If the maximum multi-programming * level would be exceeded, then this request will block until the * concurrency level has decreased. * * @exception IllegalStateException * If the thread is already bound to another transaction. * * @return The transaction. */ public Tx createTx() { return createTx( 0L, 0 ); } /** * Create a new transaction. By default the transaction will be bound to the * thread in which this request was made. If the maximum multi-programming * level would be exceeded, then this request will block until the * concurrency level has decreased. * * @param millis * * @param nanos * * @exception IllegalStateException * If the thread is already bound to another transaction. * @exception TimeoutException * If the transaction could not be created in the specified * time. * * @return The transaction. */ synchronized public Tx createTx( long millis, int nanos ) { // @todo enforce multi-programming limit. e.g., using java.util.concurrent.Queue. Tx tx = new Tx( this ); transactions.add( tx ); return tx; } /** * Return the {@link Queue} for the identified resource. * <p> * Note: The {@link Queue}s are maintained in a weak value hash map whose * keys are the resource identified. Each transaction must hold a hard * reference to each queue for which it requests a lock and must clear that * hard reference once it releases its lock. Once there are no more * references to a given {@link Queue}, the garbage collector will reclaim * the queue. This is important since the number of queue objects is bounded * by the number of distinct resources in the database and could otherwise * easily exceed the available memory. * * @param resource * The resource identified. * * @param insert * When true a new {@link Queue} will be created iff none exists * for that resource. * * @return The queue for that resource or <code>null</code> iff * <code> insert == false </code> and there is no queue for that * resource. */ synchronized protected Queue getQueue( Object resource, boolean insert ) { if( resource == null ) { throw new IllegalArgumentException(); } Queue queue = (Queue) queues.get(resource); if( queue == null && insert ) { queue = new Queue(waitsFor,resource); queues.put( resource, queue ); } return queue; } /** * A transaction object. A transaction object may only be used by the thread * to which it is currently bound. If the transaction is not bound to any * thread, then it must be bound to a thread before any other operations may * be taken. A transaction which is not bound to a thread is not running and * any transactions waiting on that transaction can not run the transaction * has been bound to a thread and its locks have been released. * * @author <a href= <mailto:Bryan> "mailto:tho...@us...">Bryan">Bryan">href="mailto:thomps on...@us...">Bryan Thompson</a> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46 thompsonbry Exp $ * * @todo Define a transaction identifier, expose it, and use it as the key * in {@link DefaultTransactionManager#transactions}? */ public static class Tx { /** * The transaction manager. */ private final DefaultTransactionManager txMgr; /** * The {@link Queue} for each lock requested by this transaction. The * queue is added to this collection before the lock request is made and * is cleared from the collection once the lock has either been released * or when a deadlock or timeout exception was thrown at the time that * the lock was requested. */ private final Set locks = new HashSet(); /** * The transaction manager. */ public DefaultTransactionManager getTranasctionManager() { return txMgr; } /** * Create a new transaction object. */ protected Tx( DefaultTransactionManager txMgr ) { if( txMgr == null ) { throw new IllegalArgumentException(); } this.txMgr = txMgr; setCurrentThread(); } /** * A thread local variable whose value is either the thread to which the * transaction is currently bound or <code>null</code> iff the * transaction is not currently bound to any thread. * * @todo Review synchronization requirements for methods that access and * set this thread local variable. */ private static ThreadLocal currentThread = new ThreadLocal() { protected synchronized Object initialValue() { return null; } }; /** * Return the thread to which this transaction is currently bound or * <code>null</code> iff the transaction is not bound to any * thread. */ synchronized public static Thread getCurrentThread() { return (Thread) currentThread.get(); } /** * Release the transaction from the thread of the current context. * * @exception IllegalStateException * if the transaction is not bound to the thread of the * current context. */ synchronized public static void releaseCurrentThread() { assertCurrentThread(); currentThread.set(null); } /** * Bind the transaction to the thread of the current context. * * @exception IllegalStateException * if the transaction is already bound to a thread. */ synchronized public static void setCurrentThread() { if (currentThread.get() != null) { throw new IllegalStateException("thread already assigned"); } currentThread.set(Thread.currentThread()); } /** * Throw exception if the {@link Tx} is not bound to the thread of * the current context. */ synchronized public static void assertCurrentThread() { if (currentThread.get() != Thread.currentThread()) { throw new IllegalStateException( "transaction not bound to this thread"); } } /** * Request a lock on the identified resource in the specified mode. This * method will block until the lock can be granted. * * @param resource * The resource identifier. * * @param mode * The mode - see {@link LockMode). * * @exception DeadlockException * if the lock request would cause a deadlock. * * @todo add timeout variant of this method. * @todo add non-blocking variant of this method. */ public void lock( final Object resource, final short mode ) { if( resource == null ) { throw new IllegalArgumentException(); } assertCurrentThread(); final Queue queue = txMgr.getQueue( resource, true ); /* * Note: If locks are reentrant, then we need to be careful to only * clear the Queue reference from [locks] once the transaction has * reduced the lock counter to zero for that queue. */ final boolean exists = locks.add( queue ); try { queue.lock( mode ); // @todo lock( this, mode ); } catch( DeadlockException t ) { if( ! exists ) locks.remove( queue ); throw t; } catch( TimeoutException t ) { if( ! exists ) locks.remove( queue ); throw t; } catch( Throwable t ) { if( ! exists ) locks.remove( queue ); throw new RuntimeException( t ); } } /** * Release the lock on the identified resource. * * @param resource * * @exception IllegalStateException * if the transaction does not have a lock on that * resource. */ public void unlock( Object resource ) { if( resource == null ) { throw new IllegalArgumentException(); } assertCurrentThread(); Queue queue = txMgr.getQueue( resource, true ); if( queue == null ) { throw new IllegalStateException( "no queue for resource: resource=" + resource); } queue.unlock(); // @todo unlock( this ); if( ! locks.remove( queue ) ) { throw new AssertionError(); } } /** * This method MUST be invoked to signal that the transaction has * completed its processing and is responsible for releasing all locks * and other resources associated with the transaction. This method MUST * be used whether the transaction was successful or aborted. This method * also releases the bond between the transaction and the thread which is * executing that transaction. */ public void complete() { /* * @todo Is it always possible to call this method, e.g., on a tx * abort, from the thread associated with the tx? Perhaps we need to * allow this method to be called by any thread and always have it * release the tx - thread bond. */ assertCurrentThread(); /* * @todo In order to optimize lock release behavior both this method * and Queue should probably synchronize on waitsFor. This would * allow us to use a unlock() variant that did not update the * waitsFor graph to release each of the locks and then use the * optimized methods to clear the row and column for this * transaction from the waitsFor graph. * * The code below just steps through all of the locks that are still * held by this transaction releasing them one at a time and * updating the waitsFor graph each time a lock is released. * * @todo If we have re-entrant locks then this code will not work * since the application will have to release the locks the right * #of times. I really do not see the point of re-entrant locks, * certainly not if they are associated with a lock count as opposed * to simply returning immedately if the lock is already held by the * transaction. */ Iterator itr = locks.iterator(); while( itr.hasNext() ) { Queue queue = (Queue) itr.next(); queue.unlock(); // @todo unlock( this ); itr.remove(); } releaseCurrentThread(); } } // class Tx. } < < ------------------------------------------------------- This SF.Net email is sponsored by xPML, a groundbreaking scripting language that extends applications into web and mobile media. Attend the live webcast and join the prime developer group breaking into this new coding territory! http://sel.as-us.falkag.net/sel?cmd=lnk&kid=110944&bid=241720&dat=121642 _______________________________________________ Jdbm-developer mailing list Jdb...@li... https://lists.sourceforge.net/lists/listinfo/jdbm-developer |
From: Kevin D. <ke...@tr...> - 2006-04-03 15:31:06
|
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN"> <HTML><HEAD> <STYLE type=text/css> P, UL, OL, DL, DIR, MENU, PRE { margin: 0 auto;}</STYLE> <META content="MSHTML 6.00.2900.2802" name=GENERATOR></HEAD> <BODY leftMargin=1 topMargin=1 rightMargin=1><FONT face=Tahoma> <DIV><FONT face=Arial size=2>Bryan-</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2></FONT></DIV> <DIV><FONT face=Arial size=2>In the world I live in (where the sky is purple and dogs talk :-) ), I think of a lock context almost entirely as equivalent to a transaction - but keeping the concept of a lock context separate helps (in my mind at least) me to remember not to put lock related code into the transaction sub-system.</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>In the recman implementation that I have got going now, I have put together an interface for a lock manager that receives event notifications as the user interacts with the record manager. It is then the lock manager's responsibility to provide the correct behavior.</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>Here are the events I have (I'm currently working with 'int' recids, eventually this will be expanded to long):</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>beforeRead(int resource, Object lockContext)</FONT></DIV> <DIV><FONT face=Arial size=2>beforeChange(int resource, Object lockContext)</FONT></DIV> <DIV><FONT face=Arial size=2>beforeCommit(Object lockContext)</FONT></DIV> <DIV><FONT face=Arial size=2>beforeAbort(Object lockContext)</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>In a 2PL implementation, the beforeRead and beforeChange calls can block. beforeCommit and beforeAbort both release all locks held by the specified context. Natch, your current queue and DAG implementations would be used to implement this.</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>In an optimistic locking implementation, the beforeRead and beforeChange calls will be used to accumulate the read and write set of a given lock context, but would not block. The beforeCommit would evaluate the read/write sets and abort if they aren't compatible. beforeAbort would just be for cleanup.</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2>- K</FONT></DIV> <DIV><FONT face=Arial size=2></FONT> </DIV> <DIV><FONT face=Arial size=2> </FONT> <TABLE> <TBODY> <TR> <TD width=1 bgColor=blue><FONT face=Arial size=2></FONT></TD> <TD><FONT face=Arial size=2><FONT color=blue>> <BR>Kevin, <BR> <BR>No problem. I will update the 2PL module along the lines that you described and we can reconcile later. It seems to me that the transaction API for jdbm2 will need to abstract all of this away from the underlying record manager (so that we can have more than one design) and away from the concurrency control logic (so that we can have more than one approach). <BR> <BR>I am curious whether the concept of a lock context in the 2PL module makes more or less sense than just calling this a transaction. Locking clearly supports only one property for transactions (isolation), but the notion of lock context may be too far removed from transaction. What do you think? This will just effect the parameter names and javadoc, not any code. <BR> <BR>-bryan <BR> <BR><BR><BR>From: <A href="mailto:jdb...@li..."><FONT color=#0000ff>jdb...@li...</FONT></A> <A href="mailto:jdb...@li..."><FONT color=#0000ff>[mailto:jdb...@li...]</FONT></A> On Behalf Of Kevin Day<BR>Sent: Saturday, April 01, 2006 4:12 PM<BR>To: JDBM Developer listserv<BR>Subject: re[2]: [Jdbm-developer] re[2]: DefaultTransactionManager - draft of prototype. <BR> <BR><BR>Bryan- <BR><BR> <BR><BR>I need to implement something along these lines in the transaction manager anyway (I already have the proof of concept for that put together), so I am going to implement the thread local thing at that level. That approach fixes a numberof dependency issues that I've been unhappy with in the implementation, so I'm going to give it a crack. <BR><BR> <BR><BR>Once I have that code written (and we have a place to share it :-) ), I think that it will become evident why Alex suggested this approach. Now, whether this has any place in the locking subsystem or not, I can't really say. For the time being, let's just move forward assuming that the calls to lock() and unlock() will all have a parameter that describes the context that the lock is being made in. I think that this lock context is going to be more important in the deadlock detectionarena than the actual locking arena (although it will, I suspect, be important for properly handling simple re-entrancy and lock upgrades). <BR><BR> <BR><BR>So, I'm going to implement assuming a lock manager interface (I'll knock something together), then we can hash out the differences between what the transaction manager needs, and what can be provided by your current design. <BR><BR> <BR><BR>Sound good? <BR><BR> <BR><BR>Cheers! <BR><BR> <BR><BR>- K <BR><BR> <BR><BR> <BR>> <BR>Kevin, <BR> <BR>I am still not quite getting it. <BR> <BR>The constraint that I am seeking to enforce is that at most one thread may do work on (invoke methods on the API of) a transaction at a time. This constraint is achieved by binding a thread and a transaction. <BR> <BR>I realize that I must be misusing the thread local variable since having on the tx makes no sense that should be a normal private field, in which case it would enforce the constraint that only the thread whose reference was set on thatfield could invoke methods on the tx. Another thread could obtain the tx only iff the tx had first been released. So I am not sure that I understand what the thread local variable was supposed to be used for since it looks like the semantics can be obtained directly from a normal field. <BR> <BR>If a thread local variable is shared by all access executing in the same thread, then I am not clear how this would be applied to create the constraint that at most one thread at a time may do work on a tx. <BR> <BR>Perhaps it would be easier and more robust to make the queue / lock smarter so that it tracked internally the thread that a transaction was bound to during a lock request. E.g., using a hash map based on the transaction object and checking for the existence of an entry for that transaction. The value of the entry would be the thread in which the tx was requesting a lock (or blocked while requesting a lock). At that point multiple threads could execute operations on the same transaction interface, but blocking in any thread would block all threads for that transaction. Requests by multiple threads for the same lock would need to be coalesced within the queue, but all blocked threads would need to be notified. <BR> <BR>-bryan <BR> <BR><BR><BR>From: <A href="mailto:jdb...@li..."><FONT color=#0000ff>jdb...@li...</FONT></A> <A href="mailto:jdb...@li..."><FONT color=#0000ff>[mailto:jdb...@li...]</FONT></A> On Behalf Of Kevin Day <BR>Sent: Thursday, March 30, 2006 8:57 PM<BR>To: JDBM Developer listserv<BR>Subject: [Jdbm-developer] re[2]: DefaultTransactionManager - draft of prototype. <BR> <BR><BR>Bryan- <BR><BR> <BR><BR>Just had a Homer Simpson 'dooh' realization on the way home. The Queue.lock() method doesn't need to know anything about the current transaction - if lock() blocks, then there is an implicit 1:1 relationship between the current thread and the transaction. If lock() doesn't block, then the transaction itself is responsible for freeing it's own locks. In neither scenario is having the transaction object itself necessary. That means that asserting the current transaction is sufficient to guarantee proper operation, even with suspending and resuming a transaction. <BR><BR> <BR><BR>We do still need to be able to obtain the transaction object from thread local storage for other purposes though (interaction with the version manager, for example), so my suggestions still stand. <BR><BR> <BR><BR>Anyway - I thought I'd better write this now so you don't think I'm a total idiot when you get my first email :-) <BR><BR> <BR><BR>- K <BR><BR> <BR><BR> <BR><BR>Bryan- <BR><BR> <BR><BR>I've done a cursory glance through this code (this will be a heck of a lot easier when we have svn access!)... <BR><BR> <BR><BR>The only thing that stands out that I feel needs comments is the currentThread variable... I think that you want this to be threadLocalTx. <BR><BR> <BR><BR>So, the call to getCurrentThread and setCurrentThread will actually morph into getCurrentTx and setCurrentTx, etc..., as in: <BR><BR> <BR><BR> <BR><BR> synchronized public static Tx getCurrentTx() { <BR><BR> return (Tx) threadLocalTx.get();<BR> } <BR><BR> <BR><BR> synchronized public static void setCurrentTx(Tx contextTx) {<BR> if (threadLocalTx.get() != null) {<BR> throw new IllegalStateException("Tx already assigned for this thread");<BR> }<BR> threadLocalTx.set(contextTx);<BR> } <BR><BR> /**<BR> * Throw exception if the specified lock context is not bound to the current thread. <BR><BR> */<BR> synchronized public static void assertCurrentTx() {<BR> if (threadLocalLockContext.get() != this) {<BR> throw new IllegalStateException("Tx context not bound to this thread");<BR> }<BR> } <BR><BR> <BR><BR> <BR><BR>I didn't see an implementation for the Queue class itself, but I suspect that the Queue.lock() method will wind up accepting a lock context object as a parameter. In jdbm, this lock context will be the transaction object returned by getCurrentTx(). <BR><BR> <BR><BR>Or am I maybe missing something important about how the Queue class operates? It seems to me that the items on the queue must somehow be associated with the transaction that holds the lock (or is waiting for the lock)... <BR><BR> <BR><BR>- K <BR><BR> <BR><BR> <BR><BR> <BR><BR> <BR>>All,<BR><BR>Here is some code based on our discussions that attempts to integrate<BR>the 2PL support with a transaction factory and support the binding of<BR>threads to transactions. Please take a look. I have made some notes<BR>where the code is incomplete, requires features not available, etc.<BR><BR>I am not sure where this is going right now. I could see how this<BR>could be re-factored into an extensible mechanism for creating<BR>transactions and making use of the 2PL facilities, but I am not sure<BR>that this much of the control structure belongs in a generic purpose<BR>2PL locking package rather than in some jdbm specific transaction<BR>management classes.<BR><BR>-bryan<BR><BR>package org.CognitiveWeb.concurrent.locking;<BR><BR>import java.util.HashMap;<BR>import java.util.HashSet;<BR>import java.util.Iterator;<BR>import java.util.Map;<BR>import java.util.Set;<BR><BR>/**<BR>* A factory for transaction objects supporting assignment of threads to<BR>* transactions. A thread may assign a transaction to itself iff the<BR>transaction<BR>* is not currently bound to a thread. If a lock request would result in a<BR>* deadlock, then a {@link DeadlockException} is thrown. If a lock request<BR>* blocks, then a transaction will remain bound to that thread until the<BR>lock<BR>* request is granted, a timeout occurs, or the transaction is aborted. When<BR>a<BR>* thread is running, it may release the bound transaction. A transaction<BR>that<BR>* is not associated with any thread is not running. If other transactions<BR>are<BR>* waiting on a transaction that is not running, then those transactions<BR>will<BR>* block until the transaction is rebound to another thread and it releases<BR>its<BR>* locks. When a transaction completes (either aborts or commits), all locks<BR>* associated with that transaction are released.<BR>* <BR>* @author <a href="mailto:tho...@us...">Bryan">Bryan">href="mailto:tho...@us...">Bryan<BR>Thompson</a><BR>* @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29 14:41:46<BR>thompsonbry Exp $<BR>*/<BR><BR>public class DefaultTransactionManager<BR>{<BR> <BR> /**<BR> * The maximum multi-programming level.<BR> */<BR> private final int capacity;<BR> <BR> /**<BR> * The directed graph of the WAITS_FOR relation which is used to<BR>detect<BR> * deadlocks.<BR> */<BR> protected final TxDag waitsFor;<BR><BR> /**<BR> * The active transaction objects.<BR> */<BR> private final Set transactions;<BR><BR> /**<BR> * The queue for each in-use resource.<BR> * <BR> * FIXME This must be a weak value hash map so that queues may be<BR>garbage<BR> * collected once they are no longer in use.<BR> */<BR> private final Map queues = new HashMap();<BR> <BR> /**<BR> * The maximum multi-programming level (from the constructor).<BR> */<BR> public int capacity() {<BR> return capacity;<BR> }<BR><BR> /**<BR> * The current multi-programming level.<BR> */<BR> public int size() {<BR> return transactions.size();<BR> }<BR> <BR> /**<BR> * Create a transaction manager.<BR> * <BR> * @param capacity<BR> * The maximum multiprogramming level (aka the maximum<BR>#of<BR> * concurrent transactions).<BR> */<BR> <BR> public DefaultTransactionManager( int capacity )<BR> {<BR> <BR> this.capacity = capacity;<BR> <BR> waitsFor = new TxDag( capacity );<BR><BR> transactions = new HashSet( capacity );<BR> <BR> }<BR><BR> /**<BR> * Create a new transaction. By default the transaction will be<BR>bound to the<BR> * thread in which this request was made. If the maximum<BR>multi-programming<BR> * level would be exceeded, then this request will block until the<BR> * concurrency level has decreased.<BR> * <BR> * @exception IllegalStateException<BR> * If the thread is already bound to another<BR>transaction.<BR> * <BR> * @return The transaction.<BR> */<BR> <BR> public Tx createTx()<BR> {<BR> <BR> return createTx( 0L, 0 );<BR><BR> }<BR> <BR> /**<BR> * Create a new transaction. By default the transaction will be<BR>bound to the<BR> * thread in which this request was made. If the maximum<BR>multi-programming<BR> * level would be exceeded, then this request will block until the<BR> * concurrency level has decreased.<BR> * <BR> * @param millis<BR> * <BR> * @param nanos<BR> * <BR> * @exception IllegalStateException<BR> * If the thread is already bound to another<BR>transaction.<BR> * @exception TimeoutException<BR> * If the transaction could not be created in the<BR>specified<BR> * time.<BR> * <BR> * @return The transaction.<BR> */<BR> <BR> synchronized public Tx createTx( long millis, int nanos )<BR> {<BR> <BR> // @todo enforce multi-programming limit. e.g., using<BR>java.util.concurrent.Queue.<BR> <BR> Tx tx = new Tx( this );<BR> <BR> transactions.add( tx );<BR> <BR> return tx;<BR> <BR> }<BR><BR> /**<BR> * Return the {@link Queue} for the identified resource.<BR> * <p><BR> * Note: The {@link Queue}s are maintained in a weak value hash map<BR>whose<BR> * keys are the resource identified. Each transaction must hold a<BR>hard<BR> * reference to each queue for which it requests a lock and must<BR>clear that<BR> * hard reference once it releases its lock. Once there are no more<BR> * references to a given {@link Queue}, the garbage collector will<BR>reclaim<BR> * the queue. This is important since the number of queue objects is<BR>bounded<BR> * by the number of distinct resources in the database and could<BR>otherwise<BR> * easily exceed the available memory.<BR> * <BR> * @param resource<BR> * The resource identified.<BR> * <BR> * @param insert<BR> * When true a new {@link Queue} will be created iff none<BR>exists<BR> * for that resource.<BR> * <BR> * @return The queue for that resource or <code>null</code> iff<BR> * <code> insert == false </code> and there is no queue for<BR>that<BR> * resource.<BR> */<BR> <BR> synchronized protected Queue getQueue( Object resource, boolean<BR>insert )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> <BR> Queue queue = (Queue) queues.get(resource);<BR> <BR> if( queue == null && insert ) {<BR> <BR> queue = new Queue(waitsFor,resource);<BR> <BR> queues.put( resource, queue );<BR> <BR> }<BR> <BR> return queue;<BR> <BR> }<BR> <BR> /**<BR> * A transaction object. A transaction object may only be used by<BR>the thread<BR> * to which it is currently bound. If the transaction is not bound<BR>to any<BR> * thread, then it must be bound to a thread before any other<BR>operations may<BR> * be taken. A transaction which is not bound to a thread is not<BR>running and<BR> * any transactions waiting on that transaction can not run the<BR>transaction<BR> * has been bound to a thread and its locks have been released.<BR> * <BR> * @author <a href="mailto:tho...@us...">Bryan">Bryan">href="mailto:tho...@us...">Bryan<BR>Thompson</a><BR> * @version $Id: DefaultTransactionManager.java,v 1.1 2006/03/29<BR>14:41:46 thompsonbry Exp $<BR> * <BR> * @todo Define a transaction identifier, expose it, and use it as<BR>the key<BR> * in {@link DefaultTransactionManager#transactions}?<BR> */<BR><BR> public static class Tx {<BR><BR> /**<BR> * The transaction manager.<BR> */<BR> private final DefaultTransactionManager txMgr;<BR><BR> /**<BR> * The {@link Queue} for each lock requested by this<BR>transaction. The<BR> * queue is added to this collection before the lock request<BR>is made and<BR> * is cleared from the collection once the lock has either<BR>been released<BR> * or when a deadlock or timeout exception was thrown at the<BR>time that<BR> * the lock was requested.<BR> */<BR> private final Set locks = new HashSet();<BR> <BR> /**<BR> * The transaction manager.<BR> */<BR><BR> public DefaultTransactionManager getTranasctionManager() {<BR> return txMgr;<BR> }<BR><BR> /**<BR> * Create a new transaction object.<BR> */<BR> protected Tx( DefaultTransactionManager txMgr ) {<BR> if( txMgr == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> this.txMgr = txMgr;<BR> setCurrentThread();<BR> }<BR> <BR> /**<BR> * A thread local variable whose value is either the thread<BR>to which the<BR> * transaction is currently bound or <code>null</code> iff<BR>the<BR> * transaction is not currently bound to any thread.<BR> * <BR> * @todo Review synchronization requirements for methods<BR>that access and<BR> * set this thread local variable.<BR> */<BR> private static ThreadLocal currentThread = new ThreadLocal()<BR>{<BR> protected synchronized Object initialValue() {<BR> return null;<BR> }<BR> };<BR><BR> /**<BR> * Return the thread to which this transaction is currently<BR>bound or<BR> * <code>null</code> iff the transaction is not bound to any<BR> * thread.<BR> */<BR> synchronized public static Thread getCurrentThread() {<BR> return (Thread) currentThread.get();<BR> }<BR><BR> /**<BR> * Release the transaction from the thread of the current<BR>context.<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction is not bound to the<BR>thread of the<BR> * current context.<BR> */<BR> synchronized public static void releaseCurrentThread() {<BR> assertCurrentThread();<BR> currentThread.set(null);<BR> }<BR><BR> /**<BR> * Bind the transaction to the thread of the current<BR>context.<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction is already bound to a<BR>thread.<BR> */<BR> synchronized public static void setCurrentThread() {<BR> if (currentThread.get() != null) {<BR> throw new IllegalStateException("thread<BR>already assigned");<BR> }<BR> currentThread.set(Thread.currentThread());<BR> }<BR><BR> /**<BR> * Throw exception if the {@link Tx} is not bound to the<BR>thread of<BR> * the current context.<BR> */<BR> synchronized public static void assertCurrentThread() {<BR> if (currentThread.get() != Thread.currentThread()) {<BR> throw new IllegalStateException(<BR> "transaction not bound to<BR>this thread");<BR> }<BR> }<BR><BR> /**<BR> * Request a lock on the identified resource in the<BR>specified mode. This<BR> * method will block until the lock can be granted.<BR> * <BR> * @param resource<BR> * The resource identifier.<BR> * <BR> * @param mode<BR> * The mode - see {@link LockMode).<BR> * <BR> * @exception DeadlockException<BR> * if the lock request would cause a<BR>deadlock.<BR> * <BR> * @todo add timeout variant of this method.<BR> * @todo add non-blocking variant of this method.<BR> */<BR> <BR> public void lock( final Object resource, final short mode )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR><BR> assertCurrentThread();<BR><BR> final Queue queue = txMgr.getQueue( resource, true<BR>);<BR> <BR> /*<BR> * Note: If locks are reentrant, then we need to be<BR>careful to only<BR> * clear the Queue reference from [locks] once the<BR>transaction has<BR> * reduced the lock counter to zero for that queue.<BR> */ <BR> final boolean exists = locks.add( queue );<BR> <BR> try {<BR> queue.lock( mode ); // @todo lock( this,<BR>mode );<BR> }<BR> catch( DeadlockException t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw t;<BR> }<BR> catch( TimeoutException t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw t;<BR> }<BR> catch( Throwable t ) {<BR> if( ! exists ) locks.remove( queue );<BR> throw new RuntimeException( t );<BR> }<BR> <BR> }<BR><BR> /**<BR> * Release the lock on the identified resource.<BR> * <BR> * @param resource<BR> * <BR> * @exception IllegalStateException<BR> * if the transaction does not have a lock on<BR>that<BR> * resource.<BR> */<BR> <BR> public void unlock( Object resource )<BR> {<BR> <BR> if( resource == null ) {<BR> throw new IllegalArgumentException();<BR> }<BR> <BR> assertCurrentThread();<BR> <BR> Queue queue = txMgr.getQueue( resource, true );<BR> <BR> if( queue == null ) {<BR> <BR> throw new IllegalStateException(<BR> "no queue for resource:<BR>resource=" + resource);<BR> <BR> }<BR><BR> queue.unlock(); // @todo unlock( this );<BR><BR><BR> if( ! locks.remove( queue ) ) {<BR> <BR> throw new AssertionError();<BR> <BR> }<BR> <BR> }<BR> <BR> /**<BR> * This method MUST be invoked to signal that the<BR>transaction has<BR> * completed its processing and is responsible for releasing<BR>all locks<BR> * and other resources associated with the transaction. This<BR>method MUST<BR> * be used whether the transaction was successful or<BR>aborted. This method<BR> * also releases the bond between the transaction and the<BR>thread which is<BR> * executing that transaction.<BR> */<BR> <BR> public void complete()<BR> {<BR><BR> /*<BR> * @todo Is it always possible to call this method,<BR>e.g., on a tx<BR> * abort, from the thread associated with the tx?<BR>Perhaps we need to<BR> * allow this method to be called by any thread and<BR>always have it<BR> * release the tx - thread bond.<BR> */<BR> assertCurrentThread();<BR> <BR> /*<BR> * @todo In order to optimize lock release behavior<BR>both this method<BR> * and Queue should probably synchronize on<BR>waitsFor. This would<BR> * allow us to use a unlock() variant that did not<BR>update the<BR> * waitsFor graph to release each of the locks and<BR>then use the<BR> * optimized methods to clear the row and column for<BR>this<BR> * transaction from the waitsFor graph.<BR> * <BR> * The code below just steps through all of the<BR>locks that are still<BR> * held by this transaction releasing them one at a<BR>time and<BR> * updating the waitsFor graph each time a lock is<BR>released.<BR> * <BR> * @todo If we have re-entrant locks then this code<BR>will not work<BR> * since the application will have to release the<BR>locks the right<BR> * #of times. I really do not see the point of<BR>re-entrant locks,<BR> * certainly not if they are associated with a lock<BR>count as opposed<BR> * to simply returning immedately if the lock is<BR>already held by the<BR> * transaction.<BR> */<BR> <BR> Iterator itr = locks.iterator();<BR> <BR> while( itr.hasNext() ) {<BR> <BR> Queue queue = (Queue) itr.next();<BR> <BR> queue.unlock(); // @todo unlock( this );<BR><BR><BR> itr.remove();<BR> <BR> }<BR><BR> releaseCurrentThread();<BR> <BR> }<BR> <BR> } // class Tx.<BR> <BR>}<BR><BR><<BR><<BR><<BR></FONT></FONT></TD></TR></TBODY></TABLE></DIV></FONT></BODY></HTML> |