From: Bryan T. <tho...@us...> - 2007-10-02 19:45:35
|
Update of /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv6007/src/java/org/CognitiveWeb/concurrent/locking Modified Files: TxDag.java Lock.java LockContextManager.java IGrantedGroup.java GrantedGroup.java IRequestQueue.java Queue.java Added Files: ConcurrentRequestQueue.java Removed Files: RequestQueue.java Log Message: Working on some edge cases in which deadlocks arise among threads that are blocked and waiting on a resource to be released but in which the WAITS_FOR graph does NOT have any cycles. Also fixed a bug in which vertices of the WAITS_FOR graph were not being released by LockContextManager. Index: IGrantedGroup.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/IGrantedGroup.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IGrantedGroup.java 17 Mar 2006 14:25:28 -0000 1.4 --- IGrantedGroup.java 2 Oct 2007 19:45:26 -0000 1.5 *************** *** 64,67 **** --- 64,72 ---- /** + * True iff the granted group is empty. + */ + public boolean isEmpty(); + + /** * Return the #of transactions in the granted group. */ *************** *** 69,79 **** /** ! * Return <code>true</code> iff the <i>lock </i> is in the granted ! * group. * ! * @param lock The lock. * ! * @return <code>true</code> iff the <i>lock </i> is in the granted ! * group. */ public boolean contains(Lock lock); --- 74,83 ---- /** ! * Return <code>true</code> iff the <i>lock </i> is in the granted group. * ! * @param lock ! * The lock. * ! * @return <code>true</code> iff the <i>lock </i> is in the granted group. */ public boolean contains(Lock lock); *************** *** 89,92 **** --- 93,100 ---- * * @exception IllegalStateException + * If the <i>lock</i> not not compatible with the mode of + * the granted group. + * + * @exception IllegalStateException * If the <i>lock</i> is already in the granted group. */ *************** *** 115,127 **** public Lock getTxLock(Object tx); ! /** ! * Recompute the group mode. This method is used when a granted lock is ! * released or when the mode of a granted lock is updated. In these ! * cases we must rescan the granted group and re-compute the granted ! * mode. ! * ! * @return The new granted mode. ! */ ! public short computeGroupMode(); /** --- 123,135 ---- public Lock getTxLock(Object tx); ! // /** ! // * Recompute the group mode. This method is used when a granted lock is ! // * released or when the mode of a granted lock is updated. In these ! // * cases we must rescan the granted group and re-compute the granted ! // * mode. ! // * ! // * @return The new granted mode. ! // */ ! // public short computeGroupMode(); /** Index: GrantedGroup.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/GrantedGroup.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** GrantedGroup.java 17 Mar 2006 14:25:28 -0000 1.3 --- GrantedGroup.java 2 Oct 2007 19:45:26 -0000 1.4 *************** *** 47,53 **** package org.CognitiveWeb.concurrent.locking; - import java.util.HashSet; import java.util.Iterator; ! import java.util.Set; --- 47,54 ---- package org.CognitiveWeb.concurrent.locking; import java.util.Iterator; ! import java.util.NoSuchElementException; ! import java.util.concurrent.ConcurrentHashMap; ! import java.util.concurrent.atomic.AtomicInteger; *************** *** 62,76 **** { ! /* ! * Since the multi-programming level is bounded in 10s-100s, the use of ! * an ArrayList provides relatively fast linear search (could be binary ! * search iff transactions define a Comparable ordering) for a specific ! * lock combined with simple semantics and the ability to traverse the ! * list w/o creating an Iterator. In contrast, the use of a Java 1.5 ! * concurrent HashSet might provide higher concurrency and faster ! * testing of whether or not a lock is a member of the granted group. ! */ // private final ArrayList data = new ArrayList(); ! private final Set grantedGroup = new HashSet(); /** --- 63,77 ---- { ! // /* ! // * Since the multi-programming level is bounded in 10s-100s, the use of ! // * an ArrayList provides relatively fast linear search (could be binary ! // * search iff transactions define a Comparable ordering) for a specific ! // * lock combined with simple semantics and the ability to traverse the ! // * list w/o creating an Iterator. In contrast, the use of a Java 1.5 ! // * concurrent HashSet might provide higher concurrency and faster ! // * testing of whether or not a lock is a member of the granted group. ! // */ // private final ArrayList data = new ArrayList(); ! private final ConcurrentHashMap<Lock,Lock> grantedGroup = new ConcurrentHashMap<Lock,Lock>(); /** *************** *** 78,86 **** * lock is released and whenever a lock request is granted. */ ! private short groupMode = LockMode.NL; ! synchronized public short getMode() { ! return groupMode; } --- 79,87 ---- * lock is released and whenever a lock request is granted. */ ! private AtomicInteger groupMode = new AtomicInteger(LockMode.NL); ! public short getMode() { ! return (short) groupMode.get(); } *************** *** 90,94 **** } ! synchronized public int size() { return grantedGroup.size(); --- 91,101 ---- } ! public boolean isEmpty() { ! ! return grantedGroup.isEmpty(); ! ! } ! ! public int size() { return grantedGroup.size(); *************** *** 96,100 **** } ! synchronized public boolean contains(Lock lock) { if (lock == null) { --- 103,107 ---- } ! public boolean contains(Lock lock) { if (lock == null) { *************** *** 104,108 **** } ! return grantedGroup.contains(lock); } --- 111,115 ---- } ! return grantedGroup.containsKey(lock); } *************** *** 122,128 **** } ! if (!grantedGroup.add(lock)) { ! throw new IllegalStateException("lock already in granted group"); } --- 129,141 ---- } ! if(!isCompatibleRequest(lock.getMode())) { ! ! throw new IllegalStateException("Lock not compatible with granted group mode"); ! ! } ! ! if (grantedGroup.putIfAbsent(lock, lock) != null) { ! throw new IllegalStateException("Lock already in granted group"); } *************** *** 142,149 **** } ! if (grantedGroup.remove(lock)) { // Re-compute the group mode. ! computeGroupMode(); return true; --- 155,162 ---- } ! if (grantedGroup.remove(lock)!=null) { // Re-compute the group mode. ! groupMode.set(computeGroupMode()); return true; *************** *** 155,159 **** } ! synchronized public Lock getTxLock(Object tx) { if (tx == null) { --- 168,172 ---- } ! public Lock getTxLock(Object tx) { if (tx == null) { *************** *** 164,175 **** /* ! * @todo optimize this if we change the grantedGroup Collection ! * class. */ ! Iterator itr = grantedGroup.iterator(); while (itr.hasNext()) { ! Lock grantedLock = (Lock) itr.next(); if (grantedLock.getTransaction() == tx) { --- 177,198 ---- /* ! * @todo optimize this? hash set/map iterators are slow. */ ! Iterator<Lock> itr = grantedGroup.keySet().iterator(); while (itr.hasNext()) { ! Lock grantedLock; ! ! try { ! ! grantedLock = (Lock) itr.next(); ! ! } catch (NoSuchElementException ex) { ! ! // concurrent removal. ! break; ! ! } if (grantedLock.getTransaction() == tx) { *************** *** 191,211 **** * @param mode * The mode of a lock being admitted to the granted group. - * - * @see #computeGroupMode() */ - private void updateGroupMode(short mode) { LockMode.assertLockedMode(mode); ! groupMode = LockMode.getSuperMode(groupMode, mode); } ! synchronized public short computeGroupMode() { ! groupMode = LockMode.NL; ! if (grantedGroup.size() == 0) { return groupMode; --- 214,238 ---- * @param mode * The mode of a lock being admitted to the granted group. */ private void updateGroupMode(short mode) { LockMode.assertLockedMode(mode); ! groupMode.set(LockMode.getSuperMode(getMode(), mode)); } ! /** ! * Recompute the group mode. This method is used when a granted lock is ! * released or when the mode of a granted lock is updated. In these cases we ! * must rescan the granted group and re-compute the granted mode. ! * ! * @return The new granted mode. ! */ ! private short computeGroupMode() { ! short groupMode = LockMode.NL; ! if (grantedGroup.isEmpty()) { return groupMode; *************** *** 213,217 **** } ! Iterator itr = grantedGroup.iterator(); while (itr.hasNext()) { --- 240,244 ---- } ! Iterator<Lock> itr = grantedGroup.keySet().iterator(); while (itr.hasNext()) { *************** *** 227,241 **** } ! synchronized public boolean isCompatibleRequest(short mode) { LockMode.assertLockedMode(mode); ! return LockMode.isCompatible(mode, groupMode); } ! synchronized public Lock[] getGrantedLocks() { ! return (Lock[]) grantedGroup.toArray(new Lock[] {}); } --- 254,268 ---- } ! public boolean isCompatibleRequest(short mode) { LockMode.assertLockedMode(mode); ! return LockMode.isCompatible(mode, getMode()); } ! public Lock[] getGrantedLocks() { ! return (Lock[]) grantedGroup.keySet().toArray(new Lock[] {}); } --- NEW FILE: ConcurrentRequestQueue.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 3, 2006 */ package org.CognitiveWeb.concurrent.locking; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.LinkedBlockingQueue; /** * Implemention based on {@link LinkedBlockingQueue}. */ class ConcurrentRequestQueue implements IRequestQueue { private final LinkedBlockingQueue<Lock> requestQueue; ConcurrentRequestQueue() { this.requestQueue = new LinkedBlockingQueue<Lock>(); } ConcurrentRequestQueue(int capacity) { this.requestQueue = new LinkedBlockingQueue<Lock>(capacity); } public boolean isEmpty() { return requestQueue.isEmpty(); } public int size() { return requestQueue.size(); } public boolean contains( Lock lock ) { if( lock == null ) { throw new IllegalArgumentException(); } return requestQueue.contains( lock ); } public void add( Lock lock ) { if( lock == null ) { throw new IllegalArgumentException(); } requestQueue.add( lock ); } public boolean remove( Lock lock ) { if( lock == null ) { throw new IllegalArgumentException(); } return requestQueue.remove( lock ); } public Lock getTxLock( Object tx ) { if (tx == null) { throw new IllegalArgumentException(); } Iterator<Lock> itr = requestQueue.iterator(); while (itr.hasNext()) { final Lock pendingLock; try { pendingLock = itr.next(); } catch(NoSuchElementException ex) { // concurrent removal - queue is now empty. break; } if (pendingLock.getTransaction() == tx) { return pendingLock; } } return null; } public Lock[] getRequestedLocks() { return (Lock[]) requestQueue.toArray(new Lock[]{}); } } Index: IRequestQueue.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/IRequestQueue.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IRequestQueue.java 17 Mar 2006 14:25:28 -0000 1.4 --- IRequestQueue.java 2 Oct 2007 19:45:26 -0000 1.5 *************** *** 65,68 **** --- 65,73 ---- /** + * True iff the request queue is empty. + */ + public boolean isEmpty(); + + /** * The #of lock requests in the queue. */ *************** *** 115,117 **** --- 120,123 ---- */ public Lock[] getRequestedLocks(); + } Index: TxDag.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/TxDag.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** TxDag.java 7 Apr 2006 16:50:04 -0000 1.10 --- TxDag.java 2 Oct 2007 19:45:25 -0000 1.11 *************** *** 277,281 **** /** ! * A list containing {@link Integer}indices available to be assigned to a * new transaction. When this list is empty, then the maximum #of * transactions are running concurrently. Entries are removed from the --- 277,281 ---- /** ! * A list containing {@link Integer} indices available to be assigned to a * new transaction. When this list is empty, then the maximum #of * transactions are running concurrently. Entries are removed from the *************** *** 283,287 **** * the list when a transaction is complete (abort or commit). */ ! private final List indices = new LinkedList(); /** --- 283,287 ---- * the list when a transaction is complete (abort or commit). */ ! private final List<Integer> indices = new LinkedList<Integer>(); /** *************** *** 291,295 **** * @see #indices */ ! private final Map mapping = new HashMap(); /** --- 291,295 ---- * @see #indices */ ! private final Map<Object,Integer> mapping = new HashMap<Object, Integer>(); /** *************** *** 407,419 **** } ! // Assign the transaction a free index. ! ! index = (Integer) indices.remove(0); ! ! if (index == null) { ! throw new AssertionError("no free index to assign?"); ! } mapping.put(tx, index); // add transaction to mapping. --- 407,423 ---- } ! /* ! * Assign the transaction a free index. Throws ! * IndexOutOfBoundsException if there is no free index ! * available. ! */ ! index = (Integer) indices.remove(0); ! // if (index == null) { ! // ! // throw new AssertionError("no free index to assign?"); ! // ! // } mapping.put(tx, index); // add transaction to mapping. *************** *** 451,457 **** * * @param tx */ ! synchronized void releaseVertex( Object tx ) { --- 455,463 ---- * * @param tx + * + * @return true iff the vertex was known. */ ! synchronized boolean releaseVertex( Object tx ) { *************** *** 460,464 **** if( index == null ) { ! throw new IllegalArgumentException("tx="+tx); } --- 466,473 ---- if( index == null ) { ! // throw new IllegalArgumentException("tx="+tx); ! log.info("Not a vertex"); ! ! return false; } *************** *** 478,481 **** --- 487,492 ---- // resetOrder(); // invalidate the order[] cache. + return true; + } *************** *** 1098,1104 **** --- 1109,1117 ---- for (int i = 0; i < dst.length; i++) { if( ! updateClosure( src, dst[i], true ) ) { + log.warn("deadlock"); if( debug ) { log.debug(toString()); } + System.err.println(toString()); // FIXME uncomment this line. restore(order); throw new DeadlockException("deadlock"); *************** *** 1217,1221 **** // Object[] transactions = getTransactions(); // populate array of explict edges w/ optional closure. ! Vector v = new Vector(); for(int i=0; i<n; i++ ) { for( int j=0; j<n; j++ ) { --- 1230,1234 ---- // Object[] transactions = getTransactions(); // populate array of explict edges w/ optional closure. ! Vector<Edge> v = new Vector<Edge>(); for(int i=0; i<n; i++ ) { for( int j=0; j<n; j++ ) { *************** *** 1497,1501 **** * have been removed. */ ! releaseVertex( tx ); if( debug ) { log.debug(toString()); --- 1510,1516 ---- * have been removed. */ ! if(!releaseVertex( tx )) { ! throw new AssertionError("Unknown vertex="+tx); ! } if( debug ) { log.debug(toString()); Index: Lock.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/Lock.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** Lock.java 27 Mar 2006 21:27:20 -0000 1.4 --- Lock.java 2 Oct 2007 19:45:26 -0000 1.5 *************** *** 47,50 **** --- 47,53 ---- package org.CognitiveWeb.concurrent.locking; + import java.util.concurrent.locks.Condition; + import java.util.concurrent.locks.ReentrantLock; + /** * A lock obtained from a {@link Queue} governing access a database resource. *************** *** 54,58 **** * @author thompsonbry */ - public class Lock { --- 57,60 ---- *************** *** 229,232 **** --- 231,244 ---- return queue.isReleased( this ); } + + public String toString() { + + return super.toString() + "{resource=" + queue.getResource() + + ", mode=" + LockMode.getName(mode) + ", tx=" + tx + "}"; + + } + + // java.util.concurrent.locks.Lock lock = new ReentrantLock(); + // Condition granted = lock.newCondition(); } Index: Queue.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/Queue.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** Queue.java 7 Apr 2006 16:50:04 -0000 1.8 --- Queue.java 2 Oct 2007 19:45:26 -0000 1.9 *************** *** 163,167 **** * The pending lock requests. */ ! private final IRequestQueue requestQueue = new RequestQueue(); /** --- 163,167 ---- * The pending lock requests. */ ! private final IRequestQueue requestQueue = new ConcurrentRequestQueue(); /** *************** *** 326,393 **** /** ! * <p> ! * Protected variant allows the caller to specify an explicit transaction ! * object rather than equating the transaction object with the thread of the ! * caller. In order to use this method the caller MUST observe the ! * constraint that at most one thread at a time may do work on a ! * transaction. Failure to observe this constraint can circumvent the ! * synchronization semantics and result in work continuing on a transaction ! * when it should be WAITing for a lock. ! * </p> ! * ! * @param tx ! * The transaction object. ! * ! * @param mode ! * The requested lock mode. ! * ! * @param timeout ! * the maximum time to wait in milliseconds. ! * ! * @param nanos ! * additional time, in nanoseconds range 0-999999. ! * ! * @return The granted {@link Lock}. ! * ! * @exception DeadlockException ! * if the lock request would result in a deadlock. ! * ! * @exception TimeoutException ! * if the lock could not be obtained within the specified ! * timeout period. ! * ! * @exception IllegalStateException ! * if the thread already has a granted lock on the resource ! * associated with this queue. ! * ! * @exception IllegalStateException ! * FIXME if the thread already has a pending lock request for ! * the resource associated with this queue (this exception ! * can only occur if more than one thread is doing work on ! * the same transaction at the same time - which is ! * explicitly forbidden). ! * ! * @issue [CONCURRENT-2] In order to support multiple locking granularities, ! * this method MUST examine the resource DAG as well in order to ! * determine if there is a higher level lock in progress. Further ! * locks must be granted and released in accord with the structure of ! * that resource DAG. This is coordinated by the use of the ! * "intention" modes to lock all ancestors in the resource DAG. For ! * example, when the resource DAG is a tree, the protocol to lock a ! * resource R in mode S or X is to first lock all ancestors of R in ! * that mode and then lock R. This method must handle all of that ! * transparently. ! * ! * @todo Examine the semantics when a lock is requested and has already been ! * granted. First, it should be impossible to request a lock that is ! * currently pending (iff transaction == thread). It then remains to ! * decide whether a lock mode may be modified by a lock request, ! * whether it is an error to request a lock when one has already been ! * granted, or whether it is allowed iff the same lock mode was ! * requested. ! * ! * @todo Add tryLock( short mode, long timeout, int nanos ) ! */ ! protected Lock lock( Object tx, short mode, long timeout, int nanos) throws DeadlockException, TimeoutException --- 326,397 ---- /** ! * <p> ! * Protected variant allows the caller to specify an explicit transaction ! * object rather than equating the transaction object with the thread of the ! * caller. In order to use this method the caller MUST observe the ! * constraint that at most one thread at a time may do work on a ! * transaction. Failure to observe this constraint can circumvent the ! * synchronization semantics and result in work continuing on a transaction ! * when it should be WAITing for a lock. ! * </p> ! * ! * @param tx ! * The transaction object. ! * ! * @param mode ! * The requested lock mode. ! * ! * @param timeout ! * the maximum time to wait in milliseconds. ! * ! * @param nanos ! * additional time, in nanoseconds range 0-999999. ! * ! * @return The granted {@link Lock}. ! * ! * @exception DeadlockException ! * if the lock request would result in a deadlock. ! * ! * @exception TimeoutException ! * if the lock could not be obtained within the specified ! * timeout period. ! * ! * @exception IllegalStateException ! * if the thread already has a granted lock on the resource ! * associated with this queue. ! * ! * @exception IllegalStateException ! * FIXME if the thread already has a pending lock request for ! * the resource associated with this queue (this exception ! * can only occur if more than one thread is doing work on ! * the same transaction at the same time - which is ! * explicitly forbidden). ! * ! * @issue [CONCURRENT-2] In order to support multiple locking granularities, ! * this method MUST examine the resource DAG as well in order to ! * determine if there is a higher level lock in progress. Further ! * locks must be granted and released in accord with the structure of ! * that resource DAG. This is coordinated by the use of the ! * "intention" modes to lock all ancestors in the resource DAG. For ! * example, when the resource DAG is a tree, the protocol to lock a ! * resource R in mode S or X is to first lock all ancestors of R in ! * that mode and then lock R. This method must handle all of that ! * transparently. ! * ! * @todo Examine the semantics when a lock is requested and has already been ! * granted. First, it should be impossible to request a lock that is ! * currently pending (iff transaction == thread). It then remains to ! * decide whether a lock mode may be modified by a lock request, ! * whether it is an error to request a lock when one has already been ! * granted, or whether it is allowed iff the same lock mode was ! * requested. ! * ! * @todo Add tryLock( short mode, long timeout, int nanos ) ! * ! * @todo What should the behavior be when interrupted while awaiting a lock? ! * What do the tests suites expect? Normally if something is ! * interrupted it should abort, so we could return a lock that is not ! * granted or interrupt the current thread. ! */ protected Lock lock( Object tx, short mode, long timeout, int nanos) throws DeadlockException, TimeoutException *************** *** 408,411 **** --- 412,418 ---- } + // @todo support timeouts + if(timeout!=0L) throw new UnsupportedOperationException(); + boolean changedState = false; *************** *** 423,427 **** if (lock != null) { ! synchronized (grantedGroup) { if (grantedGroup.contains(lock)) { --- 430,434 ---- if (lock != null) { ! // synchronized (grantedGroup) { if (grantedGroup.contains(lock)) { *************** *** 458,462 **** // } } // if( grantedGroup.contains( lock ) ) ! } // synchronized( grantedGroup ) } // if( lock != null ) --- 465,469 ---- // } } // if( grantedGroup.contains( lock ) ) ! // } // synchronized( grantedGroup ) } // if( lock != null ) *************** *** 465,487 **** synchronized (grantedGroup) { - synchronized (requestQueue) { ! /* ! * If there are no pending requests and the request is ! * compatible with the granted group, then grant it ! * immediately. ! */ ! if (requestQueue.size() == 0 ! && grantedGroup.isCompatibleRequest(mode)) { ! grantedGroup.add(lock, timestampFactory); ! changedState = true; ! return lock; ! } /* * Update the WAITS_FOR graph. --- 472,495 ---- synchronized (grantedGroup) { ! /* ! * If there are no pending requests and the request is ! * compatible with the granted group, then grant it ! * immediately. ! */ ! if (requestQueue.isEmpty() ! && grantedGroup.isCompatibleRequest(mode)) { ! grantedGroup.add(lock, timestampFactory); ! changedState = true; ! return lock; ! } + // synchronized (requestQueue) { + /* * Update the WAITS_FOR graph. *************** *** 498,502 **** changedState = true; ! } } --- 506,528 ---- changedState = true; ! // } ! ! // while(!lock.isGranted()) { ! // ! // log.info("Acquiring condition lock: "+lock); ! // lock.lock.lock(); ! // try { ! // log.info("Awaiting grant: "+lock); ! // lock.granted.await(); ! // } catch(InterruptedException ex) { ! // if(!lock.isGranted()) { ! // return lock; ! // } ! // } finally { ! // lock.lock.unlock(); ! // } ! // ! // } ! } *************** *** 594,598 **** * be granted. If the lock is not granted after the specified timeout, then * an exception is thrown. ! * * If this thread is interrupted, then we check to see if the lock was * granted. If not, and there is time remaining, then we reset the timeout --- 620,624 ---- * be granted. If the lock is not granted after the specified timeout, then * an exception is thrown. ! * <p> * If this thread is interrupted, then we check to see if the lock was * granted. If not, and there is time remaining, then we reset the timeout *************** *** 606,695 **** * * @todo Note: nanos are ignored. */ - private void block( Object tx, Lock lock, long begin, long timeout, int nanos ) { while (true) { ! try { synchronized (tx) { ! /* ! * Block the tx thread. ! */ ! long remaining = timeout ! - (System.currentTimeMillis() - begin); ! if( timeout == 0L ) { ! log.debug("blocking: resource=" + getResource() ! + ", tx=" + lock.getTransaction() ! + ", lock=" + lock); ! tx.wait(); // wait on TX. ! log.debug("resume: resource=" + getResource() ! + ", tx=" + lock.getTransaction() ! + ", lock=" + lock); ! } else if( remaining > 0L) { ! log.debug("blocking: resource=" + getResource() ! + ", tx=" + lock.getTransaction() ! + ", lock=" + lock + ", timeout=" + timeout ! + ", remaining=" + remaining); ! tx.wait(remaining); // wait on TX. ! long elapsed = System.currentTimeMillis() - begin; ! log.debug("resume: resource=" + getResource() ! + ", tx=" + lock.getTransaction() ! + ", lock=" + lock + ", timeout=" + timeout ! + ", elapsed=" + elapsed); } if (grantedGroup.contains(lock)) { // Lock was granted. if (info) { ! log.info(toString()); } return; - } else { - /* - * This condition will arise when a lock is not granted, - * either due to a timeout or because the transaction - * was aborted as a result of a deadlock. - */ - throw new RuntimeException("Lock not granted"); } ! } ! } catch (InterruptedException ex) { ! log.debug("interrupted: resource=" + getResource() ! + ", tx=" + lock.getTransaction() + ", lock=" ! + lock); ! if (grantedGroup.contains(lock)) { ! // Lock was granted. ! if (info) { ! log.info(toString()); } ! return; ! } ! } ! } // while( true ) } /** ! * Release a lock. ! * ! * @param lock ! * The lock. ! * ! * @exception IllegalArgumentException ! * If the <i>lock </i> is not associated with this ! * {@link Queue}. ! * ! * @return <code>true</code> iff a granted lock was released. (Other ! * possible outcomes include releasing a pending lock request and ! * invocation on a lock which had already been released and hence ! * was neither granted nor pending). ! * ! * @todo When integrating with an MVCC strategy for ww synchronization, the ! * timestamp of the lock MUST be updated from the timestamp assigned ! * to the transaction at its "locked point". ! * ! * @todo review synchronization assumptions in this method. ! * ! * @todo Should this method be exposed more in order to support release of ! * locks associated with aborted transactions? ! */ boolean releaseLock( Lock lock ) --- 632,719 ---- * * @todo Note: nanos are ignored. + * @todo should we abort on interrupt rather than continue? */ private void block( Object tx, Lock lock, long begin, long timeout, int nanos ) { + /* + * max time to wait w/o rechecking the grantedGroup. This keeps everyone + * from waiting forever if the lock was granted before this method was + * entered (a distinct possibility per lock(...) above). + */ + final long maxWait = 250; while (true) { ! final long remaining = timeout - (System.currentTimeMillis() - begin); ! while (timeout == 0L || remaining > 0L) { ! final long wait = timeout == 0L || remaining > maxWait ? maxWait ! : remaining; ! log.debug("blocking: resource=" + getResource() + ", tx=" ! + lock.getTransaction() + ", lock=" + lock ! + ", timeout=" + timeout); synchronized (tx) { ! try { // wait on TX. ! tx.wait(wait /* remaining */); ! } catch (InterruptedException ex) { ! log.debug("interrupted: lock=" + lock); ! if (grantedGroup.contains(lock)) { ! // Lock was granted. ! if (info) log.info(toString()); ! return; ! } } + final long elapsed = System.currentTimeMillis() - begin; + log.debug("resume: lock=" + lock + ", timeout=" + timeout + ", elapsed=" + elapsed); if (grantedGroup.contains(lock)) { // Lock was granted. if (info) { ! log.info("lock granted"); // toString()); } return; } ! if (requestQueue.isEmpty() && grantedGroup.isEmpty()) { ! // Error if both the request queue and granted ever ! // become empty. ! throw new AssertionError( ! "Blocked waiting but request queue and granted group are empty: " ! + lock); } ! // if (grantedGroup.isEmpty()) { ! // // Try granting requests. ! // grantRequests(); ! // } ! } // synchronized(tx) ! } // while(timeout...) ! /* ! * This condition will arise when a lock is not granted, either ! * due to a timeout or because the transaction was aborted as a ! * result of a deadlock. ! */ ! throw new RuntimeException("Lock not granted"); ! } // while( true ) } /** ! * Release a lock. ! * ! * @param lock ! * The lock. ! * ! * @exception IllegalArgumentException ! * If the <i>lock </i> is not associated with this ! * {@link Queue}. ! * ! * @return <code>true</code> iff a granted lock was released. (Other ! * possible outcomes include releasing a pending lock request and ! * invocation on a lock which had already been released and hence ! * was neither granted nor pending). ! * ! * @todo When integrating with an MVCC strategy for ww synchronization, the ! * timestamp of the lock MUST be updated from the timestamp assigned ! * to the transaction at its "locked point". ! * ! * @todo review synchronization assumptions in this method. ! * ! * @todo Should this method be exposed more in order to support release of ! * locks associated with aborted transactions? ! */ boolean releaseLock( Lock lock ) *************** *** 701,704 **** --- 725,730 ---- */ + final Object tx = lock.getTransaction(); + if( grantedGroup.remove( lock ) ) { *************** *** 712,717 **** Lock[] pendingLocks = requestQueue.getRequestedLocks(); for( int i=0; i<pendingLocks.length; i++ ) { ! waitsFor.removeEdge(pendingLocks[i].getTransaction(), lock ! .getTransaction()); } --- 738,742 ---- Lock[] pendingLocks = requestQueue.getRequestedLocks(); for( int i=0; i<pendingLocks.length; i++ ) { ! waitsFor.removeEdge(pendingLocks[i].getTransaction(), tx); } *************** *** 748,752 **** final boolean waiting = true; ! waitsFor.removeEdges(lock.getTransaction(), waiting ); if (info) { --- 773,777 ---- final boolean waiting = true; ! waitsFor.removeEdges(tx, waiting ); if (info) { *************** *** 835,839 **** Lock lock; ! synchronized( grantedGroup ) { if( ( lock = grantedGroup.getTxLock( tx ) ) != null ) { --- 860,864 ---- Lock lock; ! // synchronized( grantedGroup ) { if( ( lock = grantedGroup.getTxLock( tx ) ) != null ) { *************** *** 843,847 **** } ! synchronized (requestQueue) { if( ( lock = requestQueue.getTxLock( tx ) ) != null ) { --- 868,872 ---- } ! // synchronized (requestQueue) { if( ( lock = requestQueue.getTxLock( tx ) ) != null ) { *************** *** 851,857 **** } ! } ! ! } return null; --- 876,882 ---- } ! // } ! // ! // } return null; *************** *** 873,879 **** * @return <code>true</code> iff any requests were granted. */ - private boolean grantRequests() { // true iff one or more lock requests are granted. boolean changedState = false; --- 898,908 ---- * @return <code>true</code> iff any requests were granted. */ private boolean grantRequests() { + // There are no pending requests. + if (requestQueue.isEmpty()) { + return false; + } + // true iff one or more lock requests are granted. boolean changedState = false; *************** *** 881,890 **** try { ! synchronized (requestQueue) { ! ! // There are no pending requests. ! if (requestQueue.size() == 0) { ! return false; ! } /* --- 910,914 ---- try { ! // synchronized (requestQueue) { /* *************** *** 920,929 **** } - changedState = true; - if (!requestQueue.remove(requestedLock)) { ! throw new AssertionError(); } /* * Remove edges from the members of the granted group to --- 944,955 ---- } if (!requestQueue.remove(requestedLock)) { ! // Already granted or timeout out. ! log.info("Request gone"); ! return changedState; } + changedState = true; + /* * Remove edges from the members of the granted group to *************** *** 964,968 **** } ! } // next requestedLock --- 990,1003 ---- } ! ! // log.info("Acquiring condition lock: "+requestedLock); ! // requestedLock.lock.lock(); ! // try { ! // log.info("Signaling grant of lock: "+requestedLock); ! // requestedLock.granted.signalAll(); ! // } finally { ! // requestedLock.lock.unlock(); ! // } ! } // next requestedLock *************** *** 971,975 **** } // synchronized( grantedGroup ) ! } // synchronized( requestQueue ) } finally { --- 1006,1010 ---- } // synchronized( grantedGroup ) ! // } // synchronized( requestQueue ) } finally { *************** *** 1065,1069 **** --- 1100,1106 ---- */ public int getGroupSize() { + return grantedGroup.size(); + } *************** *** 1072,1098 **** */ public int getQueueSize() { return requestQueue.size(); } /** * A human readable representation of the state of the queue. */ ! public String toString() { /* ! * Make a synchronous snapshot of the state of the queue. */ final Lock[] grantedLocks; final Lock[] requestedLocks; final short groupMode; ! synchronized( grantedGroup ) { ! synchronized( requestQueue ) { ! groupMode = grantedGroup.getMode(); ! grantedLocks = grantedGroup.getGrantedLocks(); ! requestedLocks = requestQueue.getRequestedLocks(); } } --- 1109,1169 ---- */ public int getQueueSize() { + return requestQueue.size(); + } /** * A human readable representation of the state of the queue. + * <p> + * Note: The queue will be labelled as "inconsistent" because this method + * does not make a synchronous snapshot of its state in order to avoid + * deadlocks that might arise from synchronization. */ + public String toString() { + + return toString(false); + + } ! /** ! * A human readable representation of the state of the queue. ! * <p> ! * Caution: <strong>deadlocks may arise</strong> when ! * <code>synchronous := true</code>. ! * ! * @param synchronous ! * When true, a synchronous snapshot will be made. Otherwise the ! * queue will be labelled as "inconsistent" because this method ! * did not make a synchronous snapshot of its state in order to ! * avoid deadlocks that might arise from synchronization. ! * ! * @return ! */ ! public String toString(boolean synchronous) { /* ! * Make a snapshot of the state of the queue. */ final Lock[] grantedLocks; + final Lock[] requestedLocks; final short groupMode; ! if (synchronous) { ! // Note: This can cause synchronization deadlocks!!! ! synchronized (grantedGroup) { ! synchronized (requestQueue) { ! groupMode = grantedGroup.getMode(); ! grantedLocks = grantedGroup.getGrantedLocks(); ! requestedLocks = requestQueue.getRequestedLocks(); ! } } + } else { + + groupMode = grantedGroup.getMode(); + grantedLocks = grantedGroup.getGrantedLocks(); + requestedLocks = requestQueue.getRequestedLocks(); } *************** *** 1104,1108 **** StringBuffer sb = new StringBuffer(); ! sb.append("Queue(" + resource + ") "); if( grantedLocks.length > 0 ) { --- 1175,1179 ---- StringBuffer sb = new StringBuffer(); ! sb.append("Queue["+(synchronous?"":"in")+"consistent](" + resource + ") "); if( grantedLocks.length > 0 ) { --- RequestQueue.java DELETED --- Index: LockContextManager.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/java/org/CognitiveWeb/concurrent/locking/LockContextManager.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** LockContextManager.java 7 Apr 2006 16:50:04 -0000 1.1 --- LockContextManager.java 2 Oct 2007 19:45:26 -0000 1.2 *************** *** 92,96 **** * class. */ ! protected final Set lockContexts; /** --- 92,96 ---- * class. */ ! protected final Set<LockContext> lockContexts; /** *************** *** 134,155 **** waitsFor = new TxDag( capacity ); ! lockContexts = new HashSet( capacity ); } /** ! * Create a new {@link LockContext}. By default the {@link LockContext} ! * will be bound to the thread in which this request was made. ! * ! * @exception IllegalStateException ! * If the thread is already bound to another ! * {@link LockContext}. ! * ! * @exception IllegalStateException ! * If the maximum multi-programming level would be exceeded. ! * ! * @return The new {@link LockContext}. ! */ ! public LockContext createLockContext() { --- 134,154 ---- waitsFor = new TxDag( capacity ); ! lockContexts = new HashSet<LockContext>( capacity ); } /** ! * Create a new {@link LockContext}. By default the {@link LockContext} ! * initially will be bound to the thread in which this request was made. ! * ! * @exception IllegalStateException ! * If the current thread is already bound to another ! * {@link LockContext}. ! * ! * @exception IllegalStateException ! * If the maximum multi-programming level would be exceeded. ! * ! * @return The new {@link LockContext}. ! */ public LockContext createLockContext() { *************** *** 205,209 **** } ! } --- 204,218 ---- } ! ! /* ! * Release the vertex (if any) in the WAITS_FOR graph. ! * ! * Note: A vertex is created iff a dependency chain is established. ! * Therefore it is possible for a transaction to obtain a lock ! * without a vertex begin created for that tranasaction. Hence it is ! * Ok if this method returns [false]. ! */ ! waitsFor.releaseVertex(lockContext); ! } *************** *** 324,329 **** * current thread is not bound to any {@link LockContext}. */ ! final private static ThreadLocal lockContext = new ThreadLocal() { ! protected synchronized Object initialValue() { return null; } --- 333,338 ---- * current thread is not bound to any {@link LockContext}. */ ! final private static ThreadLocal<LockContext> lockContext = new ThreadLocal<LockContext>() { ! protected synchronized LockContext initialValue() { return null; } *************** *** 336,342 **** */ final static synchronized public LockContext getCurrentThread() { ! return (LockContext) lockContext.get(); } /** * Release the {@link LockContext} for the current thread. --- 345,368 ---- */ final static synchronized public LockContext getCurrentThread() { ! return lockContext.get(); } + /** + * <p> + * Forces the release of the {@link LockContext} object owned by the + * current thread (if any). + * </p> + * <p> + * Note: This is currently used by the test cases to "clean" the main + * thread in which the unit tests are executing between tests. If we + * don't clear the thread then subsequent unit tests can fail since the + * initial conditions for those tests assume that the main thread does + * not own a {@link LockContext}. + * </p> + */ + final static synchronized void clearCurrentThread() { + lockContext.set(null); + } + /** * Release the {@link LockContext} for the current thread. *************** *** 376,396 **** /** - * <p> - * Forces the release of the {@link LockContext} object owned by the - * current thread (if any). - * </p> - * <p> - * Note: This is currently used by the test cases to "clean" the main - * thread in which the unit tests are executing between tests. If we - * don't clear the thread then subsequent unit tests can fail since the - * initial conditions for those tests assume that the main thread does - * not own a {@link LockContext}. - * </p> - */ - final static synchronized void clearCurrentThread() { - lockContext.set(null); - } - - /** * The {@link LockContextManager}. */ --- 402,405 ---- *************** *** 404,415 **** * was thrown at the time that the lock was requested. */ ! private final Set locks = new HashSet(); /** * The {@link LockContext} manager. */ - public LockContextManager getLockContextManager() { ! return lockContextManager; } --- 413,425 ---- * was thrown at the time that the lock was requested. */ ! private final Set<Queue> locks = new HashSet<Queue>(); /** * The {@link LockContext} manager. */ public LockContextManager getLockContextManager() { ! ! return lockContextManager; ! } *************** *** 509,513 **** assertCurrentThread(); ! Queue queue = lockContextManager.getQueue( resource, true ); if( queue == null ) { --- 519,523 ---- assertCurrentThread(); ! Queue queue = lockContextManager.getQueue( resource, false ); if( queue == null ) { |