From: Jason D. <us...@us...> - 2002-05-25 22:51:04
|
User: user57 Date: 02/05/25 15:51:03 Modified: src/main/org/jboss/mq SpyMessageConsumer.java SpySession.java SpyXAResource.java SpyXAResourceManager.java Log: o Merged from Branch_3_0; SpySession.currentTransactionId and trace logging changes Revision Changes Path 1.18 +10 -9 jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java Index: SpyMessageConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessageConsumer.java,v retrieving revision 1.17 retrieving revision 1.18 diff -u -r1.17 -r1.18 --- SpyMessageConsumer.java 21 Apr 2002 21:18:51 -0000 1.17 +++ SpyMessageConsumer.java 25 May 2002 22:51:03 -0000 1.18 @@ -474,7 +474,7 @@ if ( session.transacted ) { - session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, mes ); + session.connection.spyXAResourceManager.ackMessage( session.getCurrentTransactionId(), mes ); } //Handle runtime exceptions. These are handled as per the spec if you assume @@ -537,17 +537,18 @@ } // Add the message to XAResource manager before we call onMessages since the - // resource may get elisted IN the onMessage method. This gives onMessage a chance to roll the message back. + // resource may get elisted IN the onMessage method. + // This gives onMessage a chance to roll the message back. Object anonymousTXID=null; if ( session.transacted ) { // Only happens with XA transactions - if( session.currentTransactionId == null ) + if( session.getCurrentTransactionId() == null ) { anonymousTXID = session.connection.spyXAResourceManager.startTx(); - session.currentTransactionId = anonymousTXID; + session.setCurrentTransactionId(anonymousTXID); } - session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); + session.connection.spyXAResourceManager.ackMessage( session.getCurrentTransactionId(), message ); } if ( thisListener != null ) @@ -565,7 +566,7 @@ // If we started an anonymous tx if (anonymousTXID != null) { - if (session.currentTransactionId == anonymousTXID) + if (session.getCurrentTransactionId() == anonymousTXID) { // This is bad.. We are an XA controled TX but no TM ever elisted us. // rollback the work and spit an error @@ -578,7 +579,7 @@ log.error("Could not rollback", e); } finally { - session.currentTransactionId = null; + session.unsetCurrentTransactionId(); } throw new SpyJMSException("Messaged delivery was not controled by a Transaction Manager"); } @@ -646,7 +647,7 @@ if ( session.transacted ) { - session.connection.spyXAResourceManager.ackMessage( session.currentTransactionId, message ); + session.connection.spyXAResourceManager.ackMessage( session.getCurrentTransactionId(), message ); } else if ( session.acknowledgeMode == session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE ) { message.doAcknowledge(); 1.14 +57 -10 jbossmq/src/main/org/jboss/mq/SpySession.java Index: SpySession.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpySession.java,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- SpySession.java 22 May 2002 19:25:09 -0000 1.13 +++ SpySession.java 25 May 2002 22:51:03 -0000 1.14 @@ -52,6 +52,7 @@ protected int acknowledgeMode; //MessageConsumers created by this session protected HashSet consumers; + // This consumer is the consumer that receives messages for the MessageListener // assigned to the session. The SpyConnectionConsumer delivers messages to him SpyMessageConsumer sessionConsumer; @@ -62,15 +63,18 @@ // Used to lock the run() method Object runLock = new Object(); - //The transctionId of the current transaction (registed with the SpyXAResourceManager) - Object currentTransactionId; + /** + * The transctionId of the current transaction (registed with the SpyXAResourceManager). + */ + private Object currentTransactionId; + // If this is an XASession, we have an associated XAResource SpyXAResource spyXAResource; // Optional Connection consumer methods java.util.LinkedList messages = new java.util.LinkedList(); - static Logger cat = Logger.getLogger( SpySession.class ); + static Logger log = Logger.getLogger( SpySession.class ); // Constructor --------------------------------------------------- @@ -89,9 +93,37 @@ //Have a TX ready with the resource manager. if ( spyXAResource == null && transacted ) { currentTransactionId = connection.spyXAResourceManager.startTx(); + if (log.isTraceEnabled()) { + log.trace("Current transaction id: " + currentTransactionId); + } } } + void setCurrentTransactionId(final Object xid) + { + if (xid == null) { + throw new org.jboss.util.NullArgumentException("xid"); + } + + if (log.isTraceEnabled()) { + log.trace("setting current tx id: " + xid + ", previous: " + currentTransactionId); + } + + this.currentTransactionId = xid; + } + + void unsetCurrentTransactionId() + { + log.trace("Unsetting current tx id"); + + this.currentTransactionId = null; + } + + Object getCurrentTransactionId() + { + return currentTransactionId; + } + public void setMessageListener( MessageListener listener ) throws JMSException { if ( closed ) { @@ -213,7 +245,7 @@ SpyMessage message = ( SpyMessage )messages.removeFirst(); try { if ( sessionConsumer == null ) { - cat.warn( "Session has no message listener set, cannot process message." ); + log.warn( "Session has no message listener set, cannot process message." ); //Nack message connection.send( message.getAcknowledgementRequest( false ) ); } else { @@ -228,7 +260,7 @@ public synchronized void close() throws JMSException { - cat.debug("Session closing."); + log.debug("Session closing."); synchronized ( runLock ) { @@ -289,7 +321,15 @@ } finally { try { currentTransactionId = connection.spyXAResourceManager.startTx(); - } catch ( Exception ignore ) { + + if (log.isTraceEnabled()) { + log.trace("Current transaction id: " + currentTransactionId); + } + } + catch ( Exception ignore ) { + if (log.isTraceEnabled()) { + log.trace("Failed to start tx", ignore); + } } } @@ -322,7 +362,14 @@ } finally { try { currentTransactionId = connection.spyXAResourceManager.startTx(); - } catch ( Exception ignore ) { + if (log.isTraceEnabled()) { + log.trace("Current transaction id: " + currentTransactionId); + } + } + catch ( Exception ignore ) { + if (log.isTraceEnabled()) { + log.trace("Failed to start tx", ignore); + } } } @@ -344,8 +391,8 @@ public void deleteTemporaryDestination( SpyDestination dest ) throws JMSException { - if (cat.isDebugEnabled()) - cat.debug( "SpySession: deleteDestination(dest=" + dest.toString() + ")" ); + if (log.isDebugEnabled()) + log.debug( "SpySession: deleteDestination(dest=" + dest.toString() + ")" ); synchronized ( consumers ) { HashSet newMap = ( HashSet )consumers.clone(); 1.5 +14 -12 jbossmq/src/main/org/jboss/mq/SpyXAResource.java Index: SpyXAResource.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResource.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- SpyXAResource.java 15 May 2002 03:05:04 -0000 1.4 +++ SpyXAResource.java 25 May 2002 22:51:03 -0000 1.5 @@ -4,9 +4,11 @@ * Distributable under LGPL license. * See terms of license at gnu.org. */ + package org.jboss.mq; import javax.jms.JMSException; + import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; @@ -24,7 +26,7 @@ implements XAResource { private static final Logger log = Logger.getLogger(SpyXAResource.class); - + ////////////////////////////////////////////////////////////////// // Attributes ////////////////////////////////////////////////////////////////// @@ -118,7 +120,7 @@ log.trace("End xid=" + xid + ", flags=" + flags); } - if ( session.currentTransactionId == null ) { + if ( session.getCurrentTransactionId() == null ) { throw new XAException( XAException.XAER_OUTSIDE ); } @@ -126,15 +128,15 @@ switch ( flags ) { case TMSUSPEND: - session.currentTransactionId = null; + session.unsetCurrentTransactionId(); session.connection.spyXAResourceManager.suspendTx( xid ); break; case TMFAIL: - session.currentTransactionId = null; + session.unsetCurrentTransactionId(); session.connection.spyXAResourceManager.endTx( xid, false ); break; case TMSUCCESS: - session.currentTransactionId = null; + session.unsetCurrentTransactionId(); session.connection.spyXAResourceManager.endTx( xid, true ); break; } @@ -230,8 +232,8 @@ } boolean convertTx=false; - if ( session.currentTransactionId != null ) { - if( flags==TMNOFLAGS && session.currentTransactionId instanceof Long ) { + if ( session.getCurrentTransactionId() != null ) { + if( flags==TMNOFLAGS && session.getCurrentTransactionId() instanceof Long ) { convertTx=true; } else { throw new XAException( XAException.XAER_OUTSIDE ); @@ -245,16 +247,16 @@ if( convertTx ) { // it was an anonymous TX, TM is now taking control over it. // convert it over to a normal XID tansaction. - session.currentTransactionId = session.connection.spyXAResourceManager.convertTx( (Long)session.currentTransactionId, xid ); + session.setCurrentTransactionId(session.connection.spyXAResourceManager.convertTx( (Long)session.getCurrentTransactionId(), xid )); } else { - session.currentTransactionId = session.connection.spyXAResourceManager.startTx( xid ); + session.setCurrentTransactionId(session.connection.spyXAResourceManager.startTx( xid )); } break; case TMJOIN: - session.currentTransactionId = session.connection.spyXAResourceManager.joinTx( xid ); + session.setCurrentTransactionId(session.connection.spyXAResourceManager.joinTx( xid )); break; case TMRESUME: - session.currentTransactionId = session.connection.spyXAResourceManager.resumeTx( xid ); + session.setCurrentTransactionId(session.connection.spyXAResourceManager.resumeTx( xid )); break; } session.runLock.notify(); 1.8 +86 -19 jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java Index: SpyXAResourceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- SpyXAResourceManager.java 15 May 2002 03:05:18 -0000 1.7 +++ SpyXAResourceManager.java 25 May 2002 22:51:03 -0000 1.8 @@ -9,15 +9,17 @@ import java.util.HashMap; import java.util.LinkedList; - import java.util.Map; + import javax.jms.JMSException; -import javax.transaction.xa.XAException; +import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; import org.jboss.logging.Logger; +import org.jboss.logging.Logger; + /** * This class implements the ResourceManager used for the XAResources used int * JBossMQ. @@ -34,9 +36,9 @@ ////////////////////////////////////////////////////////////////// // Attributes ////////////////////////////////////////////////////////////////// - Connection connection; - Map transactions = java.util.Collections.synchronizedMap( new HashMap() ); - long nextInternalXid = Long.MIN_VALUE; + private Connection connection; + private Map transactions = java.util.Collections.synchronizedMap( new HashMap() ); + private long nextInternalXid = Long.MIN_VALUE; //Valid tx states: private final static byte TX_OPEN = 0; @@ -62,7 +64,12 @@ ////////////////////////////////////////////////////////////////// public void ackMessage( Object xid, SpyMessage msg ) - throws JMSException { + throws JMSException + { + if (log.isTraceEnabled()) { + log.trace("Ack'ing message xid=" + xid); + } + TXState state = ( TXState )transactions.get( xid ); if ( state == null ) { throw new JMSException( "Invalid transaction id." ); @@ -83,7 +90,7 @@ if (trace) { log.trace("TXState=" + state); } - + if ( state == null ) { throw new JMSException( "Invalid transaction id." ); } @@ -92,7 +99,12 @@ } public void commit( Object xid, boolean onePhase ) - throws XAException, JMSException { + throws XAException, JMSException + { + if (log.isTraceEnabled()) { + log.trace("Commiting xid=" + xid + ", onePhase=" + onePhase); + } + TXState state = ( TXState )transactions.remove( xid ); if ( state == null ) { throw new XAException( XAException.XAER_NOTA ); @@ -126,7 +138,12 @@ } public void endTx( Object xid, boolean success ) - throws XAException { + throws XAException + { + if (log.isTraceEnabled()) { + log.trace("Ending xid=" + xid + ", success=" + success); + } + TXState state = ( TXState )transactions.get( xid ); if ( state == null ) { throw new XAException( XAException.XAER_NOTA ); @@ -135,7 +152,12 @@ } public Object joinTx( Xid xid ) - throws XAException { + throws XAException + { + if (log.isTraceEnabled()) { + log.trace("Joining tx xid=" + xid); + } + if ( !transactions.containsKey( xid ) ) { throw new XAException( XAException.XAER_NOTA ); } @@ -143,7 +165,12 @@ } public int prepare( Object xid ) - throws XAException, JMSException { + throws XAException, JMSException + { + if (log.isTraceEnabled()) { + log.trace("Preparing xid=" + xid); + } + TXState state = ( TXState )transactions.get( xid ); if ( state == null ) { throw new XAException( XAException.XAER_NOTA ); @@ -167,7 +194,12 @@ } public Object resumeTx( Xid xid ) - throws XAException { + throws XAException + { + if (log.isTraceEnabled()) { + log.trace("Resuming tx xid=" + xid); + } + if ( !transactions.containsKey( xid ) ) { throw new XAException( XAException.XAER_NOTA ); } @@ -175,7 +207,11 @@ } public void rollback( Object xid ) - throws XAException, JMSException { + throws XAException, JMSException + { + if (log.isTraceEnabled()) { + log.trace("Rolling back xid=" + xid); + } TXState state = ( TXState )transactions.remove( xid ); if ( state == null ) { @@ -204,14 +240,25 @@ state.txState = TX_ROLLEDBACK; } - public synchronized Object startTx() { + public synchronized Object startTx() + { Long newXid = new Long( nextInternalXid++ ); transactions.put( newXid, new TXState() ); + + if (log.isTraceEnabled()) { + log.trace("Starting tx with new xid=" + newXid); + } + return newXid; } public Object startTx( Xid xid ) - throws XAException { + throws XAException + { + if (log.isTraceEnabled()) { + log.trace("Starting tx xid=" + xid); + } + if ( transactions.containsKey( xid ) ) { throw new XAException( XAException.XAER_DUPID ); } @@ -220,14 +267,23 @@ } public Object suspendTx( Xid xid ) - throws XAException { + throws XAException + { + if (log.isTraceEnabled()) { + log.trace("Suppending tx xid=" + xid); + } + if ( !transactions.containsKey( xid ) ) { throw new XAException( XAException.XAER_NOTA ); } return xid; } - public Object convertTx( Long anonXid, Xid xid ) throws XAException { + public Object convertTx( Long anonXid, Xid xid ) throws XAException + { + if (log.isTraceEnabled()) { + log.trace("Converting tx anonXid=" + anonXid + ", xid=" + xid); + } if ( !transactions.containsKey( anonXid ) ) { throw new XAException( XAException.XAER_NOTA ); @@ -236,6 +292,7 @@ throw new XAException( XAException.XAER_DUPID ); } TXState s = (TXState)transactions.remove( anonXid ); + transactions.put( xid, s ); return xid; } @@ -247,9 +304,19 @@ /** * @created August 16, 2001 */ - class TXState { + class TXState + { byte txState = TX_OPEN; LinkedList sentMessages = new LinkedList(); LinkedList ackedMessages = new LinkedList(); + + public String toString() + { + return super.toString() + + "{ txState=" + txState + + ", sendMessages=" + sentMessages + + ", ackedMessages=" + ackedMessages + + " }"; + } } } |