From: Bryan T. <tho...@us...> - 2007-10-02 19:45:36
|
Update of /cvsroot/cweb/concurrent/src/test/org/CognitiveWeb/concurrent/schedule In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv6007/src/test/org/CognitiveWeb/concurrent/schedule Modified Files: Tx.java Condition.java Action.java Schedule.java Log Message: Working on some edge cases in which deadlocks arise among threads that are blocked and waiting on a resource to be released but in which the WAITS_FOR graph does NOT have any cycles. Also fixed a bug in which vertices of the WAITS_FOR graph were not being released by LockContextManager. Index: Action.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/test/org/CognitiveWeb/concurrent/schedule/Action.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Action.java 17 Mar 2006 14:25:29 -0000 1.3 --- Action.java 2 Oct 2007 19:45:26 -0000 1.4 *************** *** 70,76 **** final private String name; ! final private List preConditions = new LinkedList(); ! final private List postConditions = new LinkedList(); /** --- 70,76 ---- final private String name; ! final private List<Condition> preConditions = new LinkedList<Condition>(); ! final private List<Condition> postConditions = new LinkedList<Condition>(); /** Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/test/org/CognitiveWeb/concurrent/schedule/Tx.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Tx.java 17 Mar 2006 14:25:29 -0000 1.3 --- Tx.java 2 Oct 2007 19:45:26 -0000 1.4 *************** *** 49,228 **** import org.CognitiveWeb.concurrent.locking.TestQueueConcurrentTx; - /** ! * Test helper conflates a transactions with a thread. The {@link Runnable} ! * target executions the operations of the transaction as defined by a ! * {@link Schedule}. * ! * @author thompsonbry * ! * @see Schedule */ - - public class Tx extends Thread - { ! /** ! * Initially false and set to [true] when the transaction completes. ! * This field is checked by the {@link Schedule}. ! */ ! ! volatile boolean done = false; ! ! /** ! * Initially <code>null</code>. If a pre-condition, post-condition or ! * action fails, then this is set to the thrown exception. This field is ! * checked by the {@link Schedule}. ! */ ! volatile Throwable exception = null; ! ! /** ! * The next/current action to be run by the transaction. This field is ! * set by the {@link Schedule}and cleared by the {@link Tx}as each ! * action is successfully executed. The field is NOT cleared if an ! * action is blocked or if an exception is thrown. ! */ ! ! volatile Action action = null; ! ! /** ! * The schedule responsible for assigning actions to this transaction. ! */ ! final Schedule schedule; ! /** ! * Counter of the #of actions executed. ! */ ! private int nactions = 0; ! ! /** ! * Create a new transaction. ! * ! * @param schedule ! * The schedule which will run this transaction. ! * ! * @param name ! * The name of the transaction. ! */ ! ! Tx( Schedule schedule, String name ) { ! super( name ); ! if( schedule == null ) { ! throw new IllegalArgumentException(); } ! this.schedule = schedule; ! setDaemon(true); // can exit while this is running. ! start(); // start transaction -- it will wait on the Schedule. ! } ! ! public String toString() { ! return getName(); ! } ! ! /** ! * Run the transaction. Actions are executed as they set set on ! * the transaction by the schedule. ! * ! * @see Schedule#run() ! */ ! synchronized public void run() { ! while (true) { ! waitOnSchedule(); ! if (action != null) { ! runAction(); ! } ! } // while(true) ! } // run() ! /** ! * If {@link #action}!= null, then executes the pre-conditions, the ! * action, and the post-conditions defined for that action and clears ! * the {@link #action}so that another action may be tasked to this ! * transaction. ! * ! * @exception IllegalStateException ! * If the transaction is complete [done == true]. ! * ! * @exception IllegalStateException ! * If the transaction has thrown an exception [exception != ! * null]. ! */ ! ! synchronized private void runAction() ! { ! final long begin = System.currentTimeMillis(); ! try { ! // Make sure that the transaction is still valid. ! if (done || exception != null) { ! // Transaction may not execute more actions. ! throw new IllegalStateException("done/error: tx=" + this ! + ", done=" + done + ", ex=" + exception); ! } ! // pre-conditions. ! TestQueueConcurrentTx.log.debug("preConditions: tx=" + this + ", action=" + action); ! action.runPreConditions(); ! // action. ! TestQueueConcurrentTx.log.info("action: tx=" + this + ", action=" + action); ! action.run(); ! // post-condition. ! TestQueueConcurrentTx.log.debug("postConditions: tx=" + this + ", action=" + action); ! action.runPostConditions(); ! // success: Clear [action]. ! TestQueueConcurrentTx.log.debug("success: tx=" + this + ", action=" + action); ! } catch (Throwable t) { ! exception = t; ! TestQueueConcurrentTx.log.error("tx=" + this, t); ! } ! finally { ! action = null; ! nactions++; ! final long elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.info( "action: elapsed="+elapsed ); } } ! /** ! * Hand off execution to the schedule. ! */ ! synchronized private void waitOnSchedule() ! { ! if( action != null ) { ! /* ! * Do NOT wait if an action is assigned. This can happen on the ! * first action if {@link Schedule#run()}begins executing ! * actions before {@link Tx#run()}starts to execute. In this ! * case the Schedule will hold the monitor on the Tx and set the ! * action before the Tx enters this method for the first time. ! * After that this SHOULD NOT happen if things are synchronizing ! * properly. ! */ ! if( nactions > 0 ) { ! TestQueueConcurrentTx.log.warn( "Action already assigned: tx="+this+", action="+action); ! } ! return; } ! // Wait. ! TestQueueConcurrentTx.log.info("waiting: tx=" ! + this ! + (done ? ", done" : "") ! + (exception == null ? "" : ", ex=" ! + exception.getMessage())); ! final long begin = System.currentTimeMillis(); ! try { ! notifyAll(); // notify - will notify schedule waiting on tx. ! // schedule.thread.interrupt(); ! wait(); // hand off execution to the schedule. ! long elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.info("resume: tx=" + this + ", elapsed=" + elapsed); ! } catch (InterruptedException ex) { ! long elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.info( "interrupted: tx=" + this + ", elapsed=" ! + elapsed); ! } } ! ! } \ No newline at end of file --- 49,226 ---- import org.CognitiveWeb.concurrent.locking.TestQueueConcurrentTx; /** ! * Test helper conflates a transactions with a thread. The {@link Runnable} ! * target executions the operations of the transaction as defined by a ! * {@link Schedule}. ! * ! * @author thompsonbry ! * ! * @see Schedule ! */ ! public class Tx extends Thread { ! ! /** ! * Initially false and set to [true] when the transaction completes. ! * This field is checked by the {@link Schedule}. ! */ ! ! volatile boolean done = false; ! ! /** ! * Initially <code>null</code>. If a pre-condition, post-condition or ! * action fails, then this is set to the thrown exception. This field is ! * checked by the {@link Schedule}. ! */ ! ! volatile Throwable exception = null; ! ! /** ! * The next/current action to be run by the transaction. This field is ! * set by the {@link Schedule}and cleared by the {@link Tx}as each ! * action is successfully executed. The field is NOT cleared if an ! * action is blocked or if an exception is thrown. ! */ ! ! volatile Action action = null; ! ! /** ! * The schedule responsible for assigning actions to this transaction. ! */ ! ! final Schedule schedule; ! ! /** ! * Counter of the #of actions executed. ! */ ! private int nactions = 0; ! ! /** ! * Create a new transaction. * ! * @param schedule ! * The schedule which will run this transaction. * ! * @param name ! * The name of the transaction. */ ! Tx(Schedule schedule, String name) { ! super(name); ! if (schedule == null) { ! throw new IllegalArgumentException(); ! } ! this.schedule = schedule; ! setDaemon(true); // can exit while this is running. ! start(); // start transaction -- it will wait on the Schedule. ! } ! public String toString() { ! return getName(); ! } ! /** ! * Run the transaction. Actions are executed as they set set on ! * the transaction by the schedule. ! * ! * @see Schedule#run() ! */ ! synchronized public void run() { ! while (true) { ! waitOnSchedule(); ! if (action != null) { ! runAction(); } ! } // while(true) ! } // run() ! /** ! * If {@link #action}!= null, then executes the pre-conditions, the ! * action, and the post-conditions defined for that action and clears ! * the {@link #action}so that another action may be tasked to this ! * transaction. ! * ! * @exception IllegalStateException ! * If the transaction is complete [done == true]. ! * ! * @exception IllegalStateException ! * If the transaction has thrown an exception [exception != ! * null]. ! */ ! synchronized private void runAction() { ! final long begin = System.currentTimeMillis(); ! try { ! // Make sure that the transaction is still valid. ! if (done || exception != null) { ! // Transaction may not execute more actions. ! throw new IllegalStateException("done/error: tx=" + this ! + ", done=" + done + ", ex=" + exception); } + // pre-conditions. + TestQueueConcurrentTx.log.debug("preConditions: tx=" + this + + ", action=" + action); + action.runPreConditions(); + // action. + TestQueueConcurrentTx.log.info("action: tx=" + this + ", action=" + + action); + action.run(); + // post-condition. + TestQueueConcurrentTx.log.debug("postConditions: tx=" + this + + ", action=" + action); + action.runPostConditions(); + // success: Clear [action]. + TestQueueConcurrentTx.log.debug("success: tx=" + this + ", action=" + + action); + } catch (Throwable t) { + exception = t; + TestQueueConcurrentTx.log.error("tx=" + this, t); + } finally { + action = null; + nactions++; + final long elapsed = System.currentTimeMillis() - begin; + TestQueueConcurrentTx.log.info("action: elapsed=" + elapsed); } + } ! /** ! * Hand off execution to the schedule. ! */ ! synchronized private void waitOnSchedule() { ! if (action != null) { ! /* ! * Do NOT wait if an action is assigned. This can happen on the ! * first action if {@link Schedule#run()} begins executing ! * actions before {@link Tx#run()} starts to execute. In this ! * case the Schedule will hold the monitor on the Tx and set the ! * action before the Tx enters this method for the first time. ! * After that this SHOULD NOT happen if things are synchronizing ! * properly. ! */ ! if (nactions > 0) { ! TestQueueConcurrentTx.log.warn("Action already assigned: tx=" ! + this + ", action=" + action); } ! return; } ! // Wait. ! TestQueueConcurrentTx.log.info("waiting: tx=" + this ! + (done ? ", done" : "") ! + (exception == null ? "" : ", ex=" + exception.getMessage())); ! final long begin = System.currentTimeMillis(); ! try { ! notifyAll(); // notify - will notify schedule waiting on tx. ! // schedule.thread.interrupt(); ! wait(); // hand off execution to the schedule. ! long elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.info("resume: tx=" + this + ", elapsed=" ! + elapsed); ! } catch (InterruptedException ex) { ! long elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.info("interrupted: tx=" + this ! + ", elapsed=" + elapsed); ! } ! } ! ! } \ No newline at end of file Index: Schedule.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/test/org/CognitiveWeb/concurrent/schedule/Schedule.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Schedule.java 17 Mar 2006 14:25:29 -0000 1.3 --- Schedule.java 2 Oct 2007 19:45:26 -0000 1.4 *************** *** 56,453 **** import org.CognitiveWeb.concurrent.locking.action.LockAction; ! /** ! * A schedule of operations by concurrent transactions. The schedule is a ! * discrete timeline. Each point on the timeline is defined by an action. ! * Each action is paired with the transaction which will execute that ! * action. Actions execute in the thread associated with their transaction, ! * so actions never block the main thread. Actions may be associated with ! * pre- and/or post-conditions. Actions are executed atomically unless the ! * action blocks. An action MUST declare whether or not it will ! * {@link Action#block}. If the action causes an exception to be thrown, ! * then the schedule will halt and the exception to be thrown out of the ! * {@link #run()} method. ! * <p> ! * Once the action has been successfully executed by the transaction, the ! * thread for that transaction will wait until the schedule sets the next ! * action for that transaction and notifies the transaction. If an action ! * blocks, then the next action in the schedule is executed and the duration ! * of the blocked action will extend until the transaction is unblock. ! * Normally blocking occurs due to a specific combination of {@link LockAction}s ! * and unblocking results when appropriate {@link LockAction}s are released. If a ! * transaction is blocked when its next action is reached in the schedule, ! * then an error is reported (since it is impossible for the transaction to ! * execute any further actions while it is blocked). Once all actions have ! * been executed, the transactions are scanned to verify that none are ! * blocked and that all transactions are complete. An error message is ! * logged if these conditions are violated. ! * <p> ! * Aborts must be executed in the main thread so that blocked transactions ! * may be aborted. ! * <p> ! * An event log is recorded using the {@link #log}. ! * <p> * ! * @author thompsonbry * ! * @see Tx */ - - public class Schedule implements Runnable - { - - /** - * The set of declared transactions. - */ - Set transactions = new HashSet(); - - /** - * The sequence of actions. - */ - List actions = new LinkedList(); ! /** ! * The thread in which the schedule is running. This is initially ! * <code>null</code> and is set when the schedule is {@link #run()}. ! */ ! Thread thread = null; ! ! /** ! * Create a new schedule. ! */ ! public Schedule() ! { ! } ! /** ! * Create a transaction for use with this {@link Schedule}. ! * ! * @param name The name of the transaction. ! * ! * @return The transaction. ! */ ! ! public Tx createTx( String name ) { ! Tx tx = new Tx( this, name ); ! transactions.add( tx ); ! return tx; ! } ! /** ! * Add an action to the schedule. ! * ! * @param action ! * The action. ! */ ! ! public void add( Action action ) { ! if( action == null ) { ! throw new IllegalArgumentException(); ! } ! Tx tx = action.getTx(); ! if (!transactions.contains(tx)) { ! throw new IllegalArgumentException( ! "action belongs to a transaction created by a different schedule."); ! } ! actions.add( action ); } ! ! /** ! * Return iterator over the transactions known to the schedule. ! */ ! public Iterator getTransactions() { ! return transactions.iterator(); } ! /** ! * Return the #of transactions known to the schedule. ! */ ! public int getTransactionCount() { ! return transactions.size(); ! } ! /** ! * Return the #of actions scheduled for the specified transaction. ! * ! * @param tx ! * The transaction. ! * @return The #of actions scheduled for that transaction. ! */ ! public int getActionCount( Tx tx ) { ! if( tx == null ) { ! throw new IllegalArgumentException(); ! } ! int count = 0; ! Iterator itr = actions.iterator(); ! while( itr.hasNext() ) { ! Action action = (Action) itr.next(); ! if( action.getTx() == tx ) { ! count++; ! } } - return count; } ! ! /** ! * Run all actions. ! * ! * @exception RuntimeException ! * If an error has occurred either in the scheduler state ! * machine, in the execution of an action, or in the pre- ! * or post-conditions for an action. ! */ ! ! public void run() ! { ! thread = Thread.currentThread(); ! ! TestQueueConcurrentTx.log.info("Will run: " + actions.size() + " actions defined for " ! + transactions.size() + " transactions"); ! ! Iterator itr = actions.iterator(); ! ! int i = 0; ! ! while( itr.hasNext() ) { ! ! Action action = (Action) itr.next(); ! ! TestQueueConcurrentTx.log.info( "Action#="+i+": "+action ); ! ! Tx tx = action.getTx(); ! /* ! * Set the action to be executed and setup a timeout. If the ! * timeout is exceeded, then the action blocked. This should ! * only happen for actions which declare that they will block. ! * Blocking in any other case causes an exception to be thrown. ! * Failure to block when a the action was expected to block also ! * causes an exception to be thrown. ! * ! * The schedule runs in the main thread. Synchronization is on ! * the individual transactions. Normally one transaction runs at ! * a time and only when the schedule is halted. The exception ! * occurs when a transaction which was blocked becomes ! * unblocked, e.g., by sending it an interrupt or notifying the ! * Tx object. ! * ! * @todo schedule#run() Is there a way to detect a block without ! * relying on a long timeout? (Short timeouts are sometimes too ! * short and give a false positive.) ! */ ! synchronized( tx ) { ! checkTx( tx ); ! TestQueueConcurrentTx.log.info("Setting action on tx: tx="+tx+", action="+action); ! tx.action = action; // set action to be executed. ! final long timeout = 1000L; // ms. ! long elapsed; ! // this.notifyAll(); // notify all transactions. ! // synchronized (this) { ! final long begin = System.currentTimeMillis(); ! try { ! TestQueueConcurrentTx.log.debug("notify: tx="+tx); ! // tx.interrupt(); // wake up transaction. ! tx.notifyAll(); // notify tx. ! tx.wait(timeout); // transaction will attempt to run. ! elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.debug("timeout: tx="+tx+", elapsed=" + elapsed); ! } catch (InterruptedException ex) { ! elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.debug("interrupted: elapsed=" + elapsed); ! // throw new RuntimeException("???", ex); ! } ! // } ! if (tx.exception != null) { /* ! * The action caused an exception to be thrown. This ! * halts the execution of the schedule and the exception ! * is reported back to the caller. */ ! throw new RuntimeException("failure: tx=" + tx, ! tx.exception); } ! if (action.blocks) { ! if (elapsed < timeout) { ! /* ! * The transaction failed to blocked when it was ! * supposed to block. ! */ ! throw new RuntimeException( ! "action failed to block: tx=" + tx ! + ", action=" + action ! + ", elapsed=" + elapsed); ! } ! } else { ! if (elapsed >= timeout) { ! /* ! * The transaction blocked when it was not supposed ! * to block. ! */ ! throw new RuntimeException( ! "action blocked: tx=" + tx ! + ", action=" + action ! + ", elapsed=" + elapsed); ! } } - } - - i++; - - } - TestQueueConcurrentTx.log.info( "Schedule done."); - - /* - * Scan transactions and report any which are in an error condition - * [exception != null], blocked [action != null ], or failed to - * complete [done != true]. - */ - - itr = transactions.iterator(); - - while( itr.hasNext() ) { - - Tx tx = (Tx) itr.next(); - - if( tx.exception != null ) { - - TestQueueConcurrentTx.log.error( "error: tx="+tx, tx.exception ); - - } else if( tx.action != null ) { - - TestQueueConcurrentTx.log.error( "blocked: tx="+tx+" on action="+tx.action ); - - } else if( ! tx.done ) { - - TestQueueConcurrentTx.log.warn( "active: tx="+tx ); - - } - } ! ! // /* ! // * Transactions may still be executing at this point. Therefore we ! // * scan through the transaction set repeatedly, removing any ! // * transactions which have completed execution. ! // * ! // * If any transaction has set its [exception] field, then it has ! // * failed and we will throw an exception out of the test case. ! // * Otherwise if the [action] field is cleared then the transaction ! // * has finished executing (since we are not scheduling more ! // * actions). If the [done] flag was NOT set, then we log a warning ! // * since the test harness did not "commit" that transaction. ! // */ ! // ! // while( true ) { ! // ! // if( transactions.size() == 0 ) { ! // ! // // All transactions have finished. ! // ! // break; ! // ! // } ! // ! // // Scan remaining transactions. ! // ! // itr = transactions.iterator(); ! // ! // while( itr.hasNext() ) { ! // ! // Tx tx = (Tx) itr.next(); ! // ! // if( tx.exception != null ) { ! // ! // /* ! // * This transaction failed, so thrown the exception out ! // * to the test harness. ! // */ ! // ! // throw new RuntimeException("failure: tx=" + tx, ! // tx.exception); ! // ! // } ! // ! // if( tx.action == null ) { ! // ! // /* ! // * The transaction is not executing anything. ! // */ ! // ! // if( ! tx.done ) { ! // ! // /* ! // * Log a warning since the test harness did not ! // * explicitly commit the transaction. (This is not ! // * an error since we are not really required to ! // * commit transactions during tests of the ! // * concurrency control mechanism.) ! // */ ! // ! // log.warn( "transaction was not committed: tx="+tx ); ! // ! // } ! // ! // // Remove from the set of remaining transactions. ! // ! // itr.remove(); ! // ! // } ! // ! // } ! // ! // } ! } ! ! /** ! * Check some pre-conditions before tasking a transaction to execute an ! * action. ! * ! * @exception IllegalStateException ! * If the transaction is complete [{@link Tx#done}== ! * true]. ! * ! * @exception IllegalStateException ! * If the transaction has thrown an exception [ ! * {@link Tx#exception}!= null] ! * ! * @exception IllegalStateException ! * If the transaction is blocked [{@link Tx#action}!= ! * null]. */ ! ! private void checkTx( Tx tx ) ! throws IllegalStateException ! { ! if (tx.done) { ! throw new IllegalStateException("transaction is done: tx=" + tx); } ! if (tx.action != null) { ! /* ! * This case arises when a lock was not granted for a ! * transaction and the schedule reaches the next action for that ! * transaction. This is either a problem with the test case or a ! * failure in the locking system. ! */ ! if (tx.exception != null) { ! // Special case when a block transaction throws an ! // exception. ! IllegalStateException ex = new IllegalStateException( ! "transaction is blocked: tx=" + tx + " by action=" ! + tx.action); ! ex.initCause(tx.exception); ! throw ex; ! } else { ! throw new IllegalStateException( ! "transaction is blocked: tx=" + tx + " by action=" ! + tx.action); ! } ! } } ! ! } \ No newline at end of file --- 56,456 ---- import org.CognitiveWeb.concurrent.locking.action.LockAction; + /** + * A schedule of operations by concurrent transactions. The schedule is a + * discrete timeline. Each point on the timeline is defined by an {@link Action}. + * Each action is paired with the {@link Tx transaction} which will execute that + * action. Actions execute in the thread associated with their transaction, so + * actions never block the main thread. Actions may be associated with pre- + * and/or post-{@link Condition}s. Actions are executed atomically unless the + * action blocks. An action MUST declare whether or not it will + * {@link Action#block}. If the action causes an exception to be thrown, then + * the schedule will halt and the exception to be thrown out of the + * {@link #run()} method. + * <p> + * Once the action has been successfully executed by the transaction, the thread + * for that transaction will wait until the schedule sets the next action for + * that transaction and notifies the transaction. If an action blocks, then the + * next action in the schedule is executed and the duration of the blocked + * action will extend until the transaction is unblocked. Normally blocking + * occurs due to a specific combination of {@link LockAction}s and unblocking + * results when appropriate {@link LockAction}s are released. If a transaction + * is blocked when its next action is reached in the schedule, then an error is + * reported (since it is impossible for the transaction to execute any further + * actions while it is blocked). Once all actions have been executed, the + * transactions are scanned to verify that none are blocked and that all + * transactions are complete. An error message is logged if these conditions are + * violated. + * <p> + * Aborts must be executed in the main thread so that blocked transactions may + * be aborted. + * <p> + * An event log is recorded using {@link #log}. + * <p> + * + * @author thompsonbry + * + * @see Tx + * @see Action + */ + public class Schedule implements Runnable { + + /** + * The set of declared transactions. + */ + Set<Tx> transactions = new HashSet<Tx>(); ! /** ! * The sequence of actions. ! */ ! List<Action> actions = new LinkedList<Action>(); ! ! /** ! * The thread in which the schedule is running. This is initially ! * <code>null</code> and is set when the schedule is {@link #run()}. ! */ ! Thread thread = null; ! ! /** ! * Create a new schedule. ! */ ! public Schedule() { ! } ! ! /** ! * Create a transaction for use with this {@link Schedule}. * ! * @param name The name of the transaction. * ! * @return The transaction. */ ! public Tx createTx(String name) { ! Tx tx = new Tx(this, name); ! transactions.add(tx); ! return tx; ! } ! /** ! * Add an action to the schedule. ! * ! * @param action ! * The action. ! */ ! public void add(Action action) { ! if (action == null) { ! throw new IllegalArgumentException(); } ! Tx tx = action.getTx(); ! if (!transactions.contains(tx)) { ! throw new IllegalArgumentException( ! "action belongs to a transaction created by a different schedule."); } + actions.add(action); + } ! /** ! * Return iterator over the transactions known to the schedule. ! */ ! public Iterator getTransactions() { ! return transactions.iterator(); ! } ! /** ! * Return the #of transactions known to the schedule. ! */ ! public int getTransactionCount() { ! return transactions.size(); ! } ! ! /** ! * Return the #of actions scheduled for the specified transaction. ! * ! * @param tx ! * The transaction. ! * @return The #of actions scheduled for that transaction. ! */ ! public int getActionCount(Tx tx) { ! if (tx == null) { ! throw new IllegalArgumentException(); ! } ! int count = 0; ! Iterator itr = actions.iterator(); ! while (itr.hasNext()) { ! Action action = (Action) itr.next(); ! if (action.getTx() == tx) { ! count++; } } ! return count; ! } ! /** ! * Run all actions. ! * ! * @exception RuntimeException ! * If an error has occurred either in the scheduler state ! * machine, in the execution of an action, or in the pre- ! * or post-conditions for an action. ! */ ! public void run() { ! thread = Thread.currentThread(); ! ! TestQueueConcurrentTx.log.info("Will run: " + actions.size() ! + " actions defined for " + transactions.size() ! + " transactions"); ! ! Iterator itr = actions.iterator(); ! ! int i = 0; ! ! while (itr.hasNext()) { ! ! Action action = (Action) itr.next(); ! ! TestQueueConcurrentTx.log.info("Action#=" + i + ": " + action); ! ! Tx tx = action.getTx(); ! ! /* ! * Set the action to be executed and setup a timeout. If the ! * timeout is exceeded, then the action blocked. This should ! * only happen for actions which declare that they will block. ! * Blocking in any other case causes an exception to be thrown. ! * Failure to block when a the action was expected to block also ! * causes an exception to be thrown. ! * ! * The schedule runs in the main thread. Synchronization is on ! * the individual transactions. Normally one transaction runs at ! * a time and only when the schedule is halted. The exception ! * occurs when a transaction which was blocked becomes ! * unblocked, e.g., by sending it an interrupt or notifying the ! * Tx object. ! * ! * @todo schedule#run() Is there a way to detect a block without ! * relying on a long timeout? (Short timeouts are sometimes too ! * short and give a false positive.) ! */ ! ! synchronized (tx) { ! checkTx(tx); ! TestQueueConcurrentTx.log.info("Setting action on tx: tx=" + tx ! + ", action=" + action); ! tx.action = action; // set action to be executed. ! final long timeout = 1000L; // ms. ! long elapsed; ! // this.notifyAll(); // notify all transactions. ! // synchronized (this) { ! final long begin = System.currentTimeMillis(); ! try { ! TestQueueConcurrentTx.log.debug("notify: tx=" + tx); ! // tx.interrupt(); // wake up transaction. ! tx.notifyAll(); // notify tx. ! /* ! * Note: the timeout is "more or less" so we add 10% ! * in an attempt to convince wait(long) to wait for ! * at least timeout ms. ! */ ! final long requestedTimeout = (long) (timeout + timeout * .10); ! tx.wait(requestedTimeout); // transaction will attempt to run. ! elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.debug("timeout: tx=" + tx ! + ", elapsed=" + elapsed); ! } catch (InterruptedException ex) { ! elapsed = System.currentTimeMillis() - begin; ! TestQueueConcurrentTx.log.debug("interrupted: elapsed=" ! + elapsed); ! // throw new RuntimeException("???", ex); ! } ! // } ! if (tx.exception != null) { ! /* ! * The action caused an exception to be thrown. This ! * halts the execution of the schedule and the exception ! * is reported back to the caller. ! */ ! throw new RuntimeException("failure: tx=" + tx, ! tx.exception); ! } ! if (action.blocks) { ! if (elapsed < timeout) { /* ! * The transaction failed to blocked when it was ! * supposed to block. */ ! throw new RuntimeException( ! "action failed to block: tx=" + tx ! + ", action=" + action + ", elapsed=" ! + elapsed); } ! } else { ! if (elapsed >= timeout) { ! /* ! * The transaction blocked when it was not supposed ! * to block. ! */ ! throw new RuntimeException("action blocked: tx=" + tx ! + ", action=" + action + ", elapsed=" + elapsed); } } } ! ! i++; ! } ! ! TestQueueConcurrentTx.log.info("Schedule done."); ! ! /* ! * Scan transactions and report any which are in an error condition ! * [exception != null], blocked [action != null ], or failed to ! * complete [done != true]. */ ! ! itr = transactions.iterator(); ! ! while (itr.hasNext()) { ! ! Tx tx = (Tx) itr.next(); ! ! if (tx.exception != null) { ! ! TestQueueConcurrentTx.log ! .error("error: tx=" + tx, tx.exception); ! ! } else if (tx.action != null) { ! ! TestQueueConcurrentTx.log.error("blocked: tx=" + tx ! + " on action=" + tx.action); ! ! } else if (!tx.done) { ! ! TestQueueConcurrentTx.log.warn("active: tx=" + tx); ! } ! } ! ! // /* ! // * Transactions may still be executing at this point. Therefore we ! // * scan through the transaction set repeatedly, removing any ! // * transactions which have completed execution. ! // * ! // * If any transaction has set its [exception] field, then it has ! // * failed and we will throw an exception out of the test case. ! // * Otherwise if the [action] field is cleared then the transaction ! // * has finished executing (since we are not scheduling more ! // * actions). If the [done] flag was NOT set, then we log a warning ! // * since the test harness did not "commit" that transaction. ! // */ ! // ! // while( true ) { ! // ! // if( transactions.size() == 0 ) { ! // ! // // All transactions have finished. ! // ! // break; ! // ! // } ! // ! // // Scan remaining transactions. ! // ! // itr = transactions.iterator(); ! // ! // while( itr.hasNext() ) { ! // ! // Tx tx = (Tx) itr.next(); ! // ! // if( tx.exception != null ) { ! // ! // /* ! // * This transaction failed, so thrown the exception out ! // * to the test harness. ! // */ ! // ! // throw new RuntimeException("failure: tx=" + tx, ! // tx.exception); ! // ! // } ! // ! // if( tx.action == null ) { ! // ! // /* ! // * The transaction is not executing anything. ! // */ ! // ! // if( ! tx.done ) { ! // ! // /* ! // * Log a warning since the test harness did not ! // * explicitly commit the transaction. (This is not ! // * an error since we are not really required to ! // * commit transactions during tests of the ! // * concurrency control mechanism.) ! // */ ! // ! // log.warn( "transaction was not committed: tx="+tx ); ! // ! // } ! // ! // // Remove from the set of remaining transactions. ! // ! // itr.remove(); ! // ! // } ! // ! // } ! // ! // } ! ! } ! ! /** ! * Check some pre-conditions before tasking a transaction to execute an ! * action. ! * ! * @exception IllegalStateException ! * If the transaction is complete [{@link Tx#done}== ! * true]. ! * ! * @exception IllegalStateException ! * If the transaction has thrown an exception [ ! * {@link Tx#exception}!= null] ! * ! * @exception IllegalStateException ! * If the transaction is blocked [{@link Tx#action}!= ! * null]. ! */ ! ! private void checkTx(Tx tx) throws IllegalStateException { ! if (tx.done) { ! throw new IllegalStateException("transaction is done: tx=" + tx); ! } ! if (tx.action != null) { ! /* ! * This case arises when a lock was not granted for a ! * transaction and the schedule reaches the next action for that ! * transaction. This is either a problem with the test case or a ! * failure in the locking system. ! */ ! if (tx.exception != null) { ! // Special case when a block transaction throws an ! // exception. ! IllegalStateException ex = new IllegalStateException( ! "transaction is blocked: tx=" + tx + " by action=" ! + tx.action); ! ex.initCause(tx.exception); ! throw ex; ! } else { ! throw new IllegalStateException("transaction is blocked: tx=" ! + tx + " by action=" + tx.action); ! } ! } ! } ! ! } Index: Condition.java =================================================================== RCS file: /cvsroot/cweb/concurrent/src/test/org/CognitiveWeb/concurrent/schedule/Condition.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Condition.java 17 Mar 2006 14:25:29 -0000 1.3 --- Condition.java 2 Oct 2007 19:45:26 -0000 1.4 *************** *** 47,51 **** package org.CognitiveWeb.concurrent.schedule; - /** * Abstract base class for pre-condition or post-condition checks run before --- 47,50 ---- *************** *** 54,63 **** * @author thompsonbry */ ! abstract public class Condition ! { - public Condition() {} - /** * Test the state of the transaction. --- 53,61 ---- * @author thompsonbry */ + abstract public class Condition { ! public Condition() { ! } /** * Test the state of the transaction. *************** *** 69,74 **** * If the condition was violated. */ ! ! abstract public void check( Tx tx ); ! } \ No newline at end of file --- 67,72 ---- * If the condition was violated. */ ! ! abstract public void check(Tx tx); ! } \ No newline at end of file |