User: chirino Date: 02/05/03 22:09:21 Modified: src/main/org/jboss/mq/server Tag: Branch_3_0 BasicQueue.java ClientConsumer.java JMSDestination.java JMSQueue.java JMSServerInvoker.java JMSTopic.java PersistentQueue.java Added: src/main/org/jboss/mq/server Tag: Branch_3_0 JMSDestinationManager.java JMSServerInterceptor.java JMSServerInterceptorSupport.java TracingInterceptor.java Removed: src/main/org/jboss/mq/server Tag: Branch_3_0 DestinationManager.java DestinationManagerMBean.java JBossMQService.java JBossMQServiceAdapter.java JBossMQServiceAdapterMBean.java JBossMQServiceMBean.java JMSServer.java JMSServerInvokerSupport.java JMSServerMBean.java LoggingServerInvoker.java QueueManager.java QueueManagerMBean.java TopicManager.java TopicManagerMBean.java Log: Backport of the refactoring that was done on head Revision Changes Path No revision No revision 1.15.2.2 +8 -4 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.15.2.1 retrieving revision 1.15.2.2 diff -u -r1.15.2.1 -r1.15.2.2 --- BasicQueue.java 25 Apr 2002 01:00:41 -0000 1.15.2.1 +++ BasicQueue.java 4 May 2002 05:09:21 -0000 1.15.2.2 @@ -36,14 +36,14 @@ * @author Norbert Lataille (Nor...@m4...) * @author David Maplesden (Dav...@or...) * @created August 16, 2001 - * @version $Revision: 1.15.2.1 $ + * @version $Revision: 1.15.2.2 $ */ //abstract public class BasicQueue implements Runnable { public class BasicQueue { //List of messages waiting to be dispatched SortedSet messages = new TreeSet(); //The JMSServer object - JMSServer server; + JMSDestinationManager server; // The subscribers waiting for messages HashSet receivers = new HashSet(); // The subscription that all messages will goto eventualy, set for a topic's @@ -61,14 +61,18 @@ /** * Used by p2p to hold messages in a the Queue */ - public BasicQueue(JMSServer server, String description) throws JMSException { + public BasicQueue(JMSDestinationManager server, String description) throws JMSException { this(server, description, null); } + public int getQueueDepth() { + return messages.size(); + } + /** * Used by a pub-sub to hold messages routed to a client. */ - public BasicQueue(JMSServer server, String description, Subscription exclusiveSubscription ) throws JMSException { + public BasicQueue(JMSDestinationManager server, String description, Subscription exclusiveSubscription ) throws JMSException { this.server = server; this.exclusiveSubscription = exclusiveSubscription; cat = Logger.getLogger( BasicQueue.class.getName()+"."+description); 1.11.2.1 +3 -3 jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java Index: ClientConsumer.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v retrieving revision 1.11 retrieving revision 1.11.2.1 diff -u -r1.11 -r1.11.2.1 --- ClientConsumer.java 6 Mar 2002 17:27:51 -0000 1.11 +++ ClientConsumer.java 4 May 2002 05:09:21 -0000 1.11.2.1 @@ -34,13 +34,13 @@ * @author Hiram Chirino (Coj...@ho...) * @author <a href="mailto:pr...@ti...">Peter Antman</a> * @created August 16, 2001 - * @version $Revision: 1.11 $ + * @version $Revision: 1.11.2.1 $ */ public class ClientConsumer implements Work { private Logger log; //The JMSServer object - JMSServer server; + JMSDestinationManager server; //The connection this queue will send messages over ConnectionToken connectionToken; //Is this connection enabled (Can we transmit to the receiver) @@ -74,7 +74,7 @@ // Constructor --------------------------------------------------- - public ClientConsumer(JMSServer server, ConnectionToken connectionToken) throws JMSException + public ClientConsumer(JMSDestinationManager server, ConnectionToken connectionToken) throws JMSException { this.server = server; this.connectionToken = connectionToken; 1.9.2.1 +3 -3 jbossmq/src/main/org/jboss/mq/server/JMSDestination.java Index: JMSDestination.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSDestination.java,v retrieving revision 1.9 retrieving revision 1.9.2.1 diff -u -r1.9 -r1.9.2.1 --- JMSDestination.java 28 Mar 2002 03:08:05 -0000 1.9 +++ JMSDestination.java 4 May 2002 05:09:21 -0000 1.9.2.1 @@ -30,7 +30,7 @@ * @author Hiram Chirino (Coj...@ho...) * @author David Maplesden (Dav...@or...) * @created August 16, 2001 - * @version $Revision: 1.9 $ + * @version $Revision: 1.9.2.1 $ */ public abstract class JMSDestination { @@ -39,7 +39,7 @@ //If this is a temporaryDestination, temporaryDestination=ClientConsumer of the owner, otherwise it's null ClientConsumer temporaryDestination; //The JMSServer object - JMSServer server; + JMSDestinationManager server; //Counter used to number incomming messages. (Used to order the messages.) long messageIdCounter = 0; @@ -47,7 +47,7 @@ Logger cat; // Constructor --------------------------------------------------- - JMSDestination( SpyDestination dest, ClientConsumer temporary, JMSServer server ) + JMSDestination( SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server ) throws JMSException { cat = Logger.getLogger( this.getClass().getName() + ":" + dest ); destination = dest; 1.9.2.2 +3 -3 jbossmq/src/main/org/jboss/mq/server/JMSQueue.java Index: JMSQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v retrieving revision 1.9.2.1 retrieving revision 1.9.2.2 diff -u -r1.9.2.1 -r1.9.2.2 --- JMSQueue.java 25 Apr 2002 01:00:41 -0000 1.9.2.1 +++ JMSQueue.java 4 May 2002 05:09:21 -0000 1.9.2.2 @@ -30,14 +30,14 @@ * @author Hiram Chirino (Coj...@ho...) * @author David Maplesden (Dav...@or...) * @created August 16, 2001 - * @version $Revision: 1.9.2.1 $ + * @version $Revision: 1.9.2.2 $ */ public class JMSQueue extends JMSDestination { - BasicQueue queue; + public BasicQueue queue; // Constructor --------------------------------------------------- - JMSQueue( SpyDestination dest, ClientConsumer temporary, JMSServer server ) + public JMSQueue( SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server ) throws JMSException { super( dest, temporary, server ); 1.1.2.1 +115 -69 jbossmq/src/main/org/jboss/mq/server/JMSServerInvoker.java Index: JMSServerInvoker.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSServerInvoker.java,v retrieving revision 1.1 retrieving revision 1.1.2.1 diff -u -r1.1 -r1.1.2.1 --- JMSServerInvoker.java 6 Mar 2002 17:27:51 -0000 1.1 +++ JMSServerInvoker.java 4 May 2002 05:09:21 -0000 1.1.2.1 @@ -18,36 +18,56 @@ import org.jboss.mq.AcknowledgementRequest; import org.jboss.mq.ConnectionToken; import org.jboss.mq.DurableSubscriptionID; +import org.jboss.mq.il.Invoker; +import org.jboss.mq.il.ServerIL; +import org.jboss.mq.ConnectionToken; import org.jboss.mq.SpyDestination; -import org.jboss.mq.SpyMessage; import org.jboss.mq.SpyTopic; -import org.jboss.mq.Subscription; +import org.jboss.mq.SpyMessage; import org.jboss.mq.TransactionRequest; +import org.jboss.mq.Subscription; +import org.jboss.logging.Logger; /** - * Invoker interface for clients IL accessing the JMSServer. - * - * Using an iterface for this layer makes it possible to put in logic - * without having to modify the server objekt. And also makes this - * pluggable. - * + * A pass through JMSServerInvoker. * * @author <a href="mailto:pr...@ti...">Peter Antman</a> - * @version $Revision: 1.1 $ + * @version $Revision: 1.1.2.1 $ */ -public interface JMSServerInvoker { - +public class JMSServerInvoker implements Invoker { + protected Logger log; + /** + * Next invoker in chain. + */ + protected JMSServerInterceptor nextInterceptor = null; + + public JMSServerInvoker() { + log = Logger.getLogger(this.getClass().getName()); + } + /** * Set next invoker in chain to be called. Is mot often the real JMSServer */ - public void setJMSServerInvoker(JMSServerInvoker server); + public void setNext(JMSServerInterceptor server) { + this.nextInterceptor = server; + } + + /** + * @see JMSServerInterceptor#getNext() + */ + public JMSServerInterceptor getNext() + { + return this.nextInterceptor; + } /** * Get the thread group of the server. */ - public ThreadGroup getThreadGroup(); + public ThreadGroup getThreadGroup() { + return nextInterceptor.getThreadGroup(); + } /** * Gets a clientID from server. * @@ -55,45 +75,55 @@ * @exception JMSException Description of Exception */ public String getID() - throws JMSException; + throws JMSException { + return nextInterceptor.getID(); + } /** - * Get a temporary topic. + * Gets the TemporaryTopic attribute of the ServerIL object * * @param dc Description of Parameter * @return The TemporaryTopic value * @exception JMSException Description of Exception */ public TemporaryTopic getTemporaryTopic(ConnectionToken dc) - throws JMSException; + throws JMSException { + return nextInterceptor.getTemporaryTopic(dc); + } /** - * Get a temporary queue + * Gets the TemporaryQueue attribute of the ServerIL object * * @param dc Description of Parameter * @return The TemporaryQueue value * @exception JMSException Description of Exception */ public TemporaryQueue getTemporaryQueue(ConnectionToken dc) - throws JMSException; + throws JMSException { + return nextInterceptor.getTemporaryQueue(dc); + } /** - * Close connection. + * #Description of the Method * * @param dc Description of Parameter * @exception JMSException Description of Exception */ public void connectionClosing(ConnectionToken dc) - throws JMSException; + throws JMSException { + nextInterceptor.connectionClosing(dc); + } /** * Check id, must not be taken. * - * @param ID Description of Parameter - * @exception JMSException Description of Exception + * @param ID a clientID + * @exception JMSException if ID is already taken */ public void checkID(String ID) - throws JMSException; + throws JMSException { + nextInterceptor.checkID(ID); + } /** * Add the message to the destination. @@ -103,14 +133,12 @@ * @exception JMSException Description of Exception */ public void addMessage(ConnectionToken dc, SpyMessage message) - throws JMSException; + throws JMSException { + nextInterceptor.addMessage(dc,message); + } /** - * Create a queue. - * - * The destination name must be the name of an already existing - * destination. This method should only be used to skip looking - * up a destination through JNDI, not to actually create a new destination. + * #Description of the Method * * @param dc Description of Parameter * @param dest Description of Parameter @@ -118,14 +146,12 @@ * @exception JMSException Description of Exception */ public Queue createQueue(ConnectionToken dc, String dest) - throws JMSException; + throws JMSException { + return nextInterceptor.createQueue(dc,dest); + } /** - * Create a topic. - * - * The destination name must be the name of an already existing - * destination. This method should only be used to skip looking - * up a destination through JNDI, not to actually create a new destination. + * #Description of the Method * * @param dc Description of Parameter * @param dest Description of Parameter @@ -133,7 +159,9 @@ * @exception JMSException Description of Exception */ public Topic createTopic(ConnectionToken dc, String dest) - throws JMSException; + throws JMSException { + return nextInterceptor.createTopic(dc,dest); + } /** * #Description of the Method @@ -143,7 +171,9 @@ * @exception JMSException Description of Exception */ public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest) - throws JMSException; + throws JMSException { + nextInterceptor.deleteTemporaryDestination(dc,dest); + } /** * #Description of the Method @@ -153,7 +183,9 @@ * @exception JMSException Description of Exception */ public void transact(ConnectionToken dc, TransactionRequest t) - throws JMSException; + throws JMSException { + nextInterceptor.transact(dc,t); + } /** * #Description of the Method @@ -163,7 +195,9 @@ * @exception JMSException Description of Exception */ public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) - throws JMSException; + throws JMSException { + nextInterceptor.acknowledge(dc,item); + } /** * #Description of the Method @@ -175,7 +209,9 @@ * @exception JMSException Description of Exception */ public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) - throws JMSException; + throws JMSException { + return nextInterceptor.browse(dc,dest,selector); + } /** * #Description of the Method @@ -187,7 +223,9 @@ * @exception JMSException Description of Exception */ public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) - throws JMSException; + throws JMSException { + return nextInterceptor.receive(dc,subscriberId,wait); + } /** * Sets the Enabled attribute of the ServerIL object @@ -197,81 +235,89 @@ * @exception JMSException Description of Exception */ public void setEnabled(ConnectionToken dc, boolean enabled) - throws JMSException; + throws JMSException { + nextInterceptor.setEnabled(dc,enabled); + } /** - * Close the server side message consumer. Client is no longer - * available to receive messages. + * #Description of the Method * * @param dc Description of Parameter * @param subscriptionId Description of Parameter * @exception JMSException Description of Exception */ public void unsubscribe(ConnectionToken dc, int subscriptionId) - throws JMSException; + throws JMSException { + nextInterceptor.unsubscribe(dc,subscriptionId); + } /** - * Unsubscribe from the durable subscription. + * #Description of the Method * * @param id Description of Parameter * @exception JMSException Description of Exception */ public void destroySubscription(ConnectionToken dc,DurableSubscriptionID id) - throws JMSException; + throws JMSException { + nextInterceptor.destroySubscription(dc,id); + } /** * Check user for autentication. * * @param userName Description of Parameter * @param password Description of Parameter - * @return a preconfigured clientId. + * @return a clientId. * @exception JMSException if user was not allowed to login */ public String checkUser(String userName, String password) - throws JMSException; - + throws JMSException { + return nextInterceptor.checkUser(userName,password); + } /** * Check user for autentication. * * @param userName Description of Parameter * @param password Description of Parameter - * @return a sessionId. + * @return a sessionId * @exception JMSException if user was not allowed to login */ public String authenticate(String userName, String password) - throws JMSException; - + throws JMSException { + return nextInterceptor.authenticate(userName,password); + } /** * @param dc org.jboss.mq.ConnectionToken * @param s org.jboss.mq.Subscription * @exception JMSException The exception description. */ - void subscribe(org.jboss.mq.ConnectionToken dc, org.jboss.mq.Subscription s) - throws JMSException; + public void subscribe(org.jboss.mq.ConnectionToken dc, org.jboss.mq.Subscription s) + throws JMSException { + nextInterceptor.subscribe(dc,s); + } /** - * Ping the server. + * #Description of the Method * * @param dc Description of Parameter * @param clientTime Description of Parameter * @exception JMSException Description of Exception */ public void ping(ConnectionToken dc, long clientTime) - throws JMSException; + throws JMSException { + nextInterceptor.ping(dc,clientTime); + } - /** - * Get the topic the durable subscription is on. - * Primary for internal use in the server, and not for the IL's. - */ public SpyTopic getDurableTopic(DurableSubscriptionID sub) - throws JMSException; + throws JMSException { + return nextInterceptor.getDurableTopic(sub); + } - /** - * Get the subscription that match the id. - * - * @exception JMSException if it can not find the subscription. - */ - public Subscription getSubscription(ConnectionToken dc,int subscriberId) throws JMSException; -} + public Subscription getSubscription(ConnectionToken dc,int subscriberId) throws JMSException { + return nextInterceptor.getSubscription(dc,subscriberId); + } + + +} // JMSServerInvokerSupport 1.12.2.3 +26 -3 jbossmq/src/main/org/jboss/mq/server/JMSTopic.java Index: JMSTopic.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v retrieving revision 1.12.2.2 retrieving revision 1.12.2.3 diff -u -r1.12.2.2 -r1.12.2.3 --- JMSTopic.java 26 Apr 2002 04:33:32 -0000 1.12.2.2 +++ JMSTopic.java 4 May 2002 05:09:21 -0000 1.12.2.3 @@ -23,6 +23,8 @@ import org.jboss.mq.SpyMessage; import org.jboss.mq.SpyTopic; import org.jboss.mq.Subscription; +import org.jboss.mq.pm.Tx; +import org.jboss.mq.selectors.Selector; /** * This class is a message queue which is stored (hashed by Destination) on the @@ -32,7 +34,7 @@ * @author Hiram Chirino (Coj...@ho...) * @author David Maplesden (Dav...@or...) * @created August 16, 2001 - * @version $Revision: 1.12.2.2 $ + * @version $Revision: 1.12.2.3 $ */ public class JMSTopic extends JMSDestination { @@ -41,7 +43,7 @@ HashMap tempQueues = new HashMap(); // Constructor --------------------------------------------------- - JMSTopic(SpyDestination dest, ClientConsumer temporary, JMSServer server) throws JMSException { + public JMSTopic(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server) throws JMSException { super(dest, temporary, server); } @@ -142,7 +144,28 @@ SpyTopic dstopic = new SpyTopic((SpyTopic) destination, id); - BasicQueue queue = new PersistentQueue(server, dstopic); + // Create a + BasicQueue queue; + if( id.getSelector() == null ) { + queue = new PersistentQueue(server, dstopic); + } else { + // This guy drops messages if his selector does not match. + class SelectorPersistentQueue extends PersistentQueue { + Selector selector; + SelectorPersistentQueue(JMSDestinationManager server, SpyTopic dstopic, String selector) throws JMSException { + super( server, dstopic); + this.selector = new Selector(selector); + } + + public void addMessage( MessageReference mesRef, Tx txId ) throws JMSException { + if( selector.test(mesRef.getHeaders()) ) { + super.addMessage( mesRef, txId ); + } + } + } + queue = new SelectorPersistentQueue(server, dstopic, id.getSelector()); + } + synchronized (durQueues) { durQueues.put(id, queue); } 1.5.2.2 +1 -1 jbossmq/src/main/org/jboss/mq/server/PersistentQueue.java Index: PersistentQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/PersistentQueue.java,v retrieving revision 1.5.2.1 retrieving revision 1.5.2.2 diff -u -r1.5.2.1 -r1.5.2.2 --- PersistentQueue.java 25 Apr 2002 01:00:41 -0000 1.5.2.1 +++ PersistentQueue.java 4 May 2002 05:09:21 -0000 1.5.2.2 @@ -16,7 +16,7 @@ public class PersistentQueue extends org.jboss.mq.server.BasicQueue { SpyDestination destination; - public PersistentQueue( JMSServer server, SpyDestination destination ) + public PersistentQueue( JMSDestinationManager server, SpyDestination destination ) throws JMSException { super( server, destination.toString() ); this.destination = destination; No revision No revision 1.1.2.1 +1 -1 jbossmq/src/main/org/jboss/mq/server/JMSDestinationManager.java Index: JMSDestinationManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSDestinationManager.java,v retrieving revision 1.1 retrieving revision 1.1.2.1 diff -u -r1.1 -r1.1.2.1 --- JMSDestinationManager.java 4 May 2002 03:07:52 -0000 1.1 +++ JMSDestinationManager.java 4 May 2002 05:09:21 -0000 1.1.2.1 @@ -52,7 +52,7 @@ * @author Hiram Chirino (Coj...@ho...) * @author David Maplesden (Dav...@or...) * @author <a href="mailto:pr...@ti...">Peter Antman</a> - * @version $Revision: 1.1 $ + * @version $Revision: 1.1.2.1 $ */ public class JMSDestinationManager extends JMSServerInterceptorSupport { 1.1.2.1 +0 -0 jbossmq/src/main/org/jboss/mq/server/JMSServerInterceptor.java Index: JMSServerInterceptor.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSServerInterceptor.java,v retrieving revision 1.1 retrieving revision 1.1.2.1 diff -u -r1.1 -r1.1.2.1 --- JMSServerInterceptor.java 4 May 2002 03:07:52 -0000 1.1 +++ JMSServerInterceptor.java 4 May 2002 05:09:21 -0000 1.1.2.1 @@ -34,7 +34,7 @@ * * * @author <a href="mailto:pr...@ti...">Peter Antman</a> - * @version $Revision: 1.1 $ + * @version $Revision: 1.1.2.1 $ */ public interface JMSServerInterceptor { 1.1.2.1 +0 -0 jbossmq/src/main/org/jboss/mq/server/JMSServerInterceptorSupport.java Index: JMSServerInterceptorSupport.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSServerInterceptorSupport.java,v retrieving revision 1.1 retrieving revision 1.1.2.1 diff -u -r1.1 -r1.1.2.1 --- JMSServerInterceptorSupport.java 4 May 2002 03:07:52 -0000 1.1 +++ JMSServerInterceptorSupport.java 4 May 2002 05:09:21 -0000 1.1.2.1 @@ -30,7 +30,7 @@ * A pass through JMSServerInvoker. * * @author <a href="mailto:pr...@ti...">Peter Antman</a> - * @version $Revision: 1.1 $ + * @version $Revision: 1.1.2.1 $ */ public class JMSServerInterceptorSupport implements JMSServerInterceptor { 1.1.2.1 +0 -0 jbossmq/src/main/org/jboss/mq/server/TracingInterceptor.java Index: TracingInterceptor.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/TracingInterceptor.java,v retrieving revision 1.1 retrieving revision 1.1.2.1 diff -u -r1.1 -r1.1.2.1 --- TracingInterceptor.java 4 May 2002 03:07:52 -0000 1.1 +++ TracingInterceptor.java 4 May 2002 05:09:21 -0000 1.1.2.1 @@ -30,7 +30,7 @@ * A pass through Interceptor, wich will trace all calls. * * @author <a href="mailto:pr...@ti...">Peter Antman</a> - * @version $Revision: 1.1 $ + * @version $Revision: 1.1.2.1 $ */ public class TracingInterceptor extends JMSServerInterceptorSupport { |