[Comsuite-svn] SF.net SVN: comsuite: [205] trunk/code/CSMiddleware/src/org/commsuite/queue /JMSQueu
                
                Brought to you by:
                
                    zduniak
                    
                
            
            
        
        
        
    | 
     
      
      
      From: <ma...@us...> - 2006-10-05 16:13:54
      
     
   | 
Revision: 205
          http://svn.sourceforge.net/comsuite/?rev=205&view=rev
Author:   marasm
Date:     2006-10-05 09:13:43 -0700 (Thu, 05 Oct 2006)
Log Message:
-----------
holding references to consumers
Modified Paths:
--------------
    trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java
Modified: trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java
===================================================================
--- trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java	2006-10-05 13:48:41 UTC (rev 204)
+++ trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java	2006-10-05 16:13:43 UTC (rev 205)
@@ -20,6 +20,7 @@
  */
 package org.commsuite.queue;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -45,10 +46,14 @@
  * @author Marcin Zduniak
  * @author Rafał Malinowski
  */
-public abstract class JMSQueueManager implements IQueueManager, ExceptionListener {
+public abstract class JMSQueueManager implements IQueueManager,
+        ExceptionListener {
 
-    private final static Logger logger = Logger.getLogger(JMSQueueManager.class);
+    private List<MessageConsumer> consumersList = new ArrayList<MessageConsumer>();
 
+    private final static Logger logger = Logger
+            .getLogger(JMSQueueManager.class);
+
     protected final static String QUEUE_PROPERTIES;
     static {
         // see: http://activemq.org/Destination+Options for all options
@@ -56,7 +61,8 @@
         sb.append("?");
         sb.append("consumer.dispatchAsync=true&"); // more:
         // http://incubator.apache.org/activemq/consumer-dispatch-async.html
-        sb.append("consumer.exclusive=true&"); // more: http://activemq.org/Exclusive+Consumer
+        sb.append("consumer.exclusive=true&"); // more:
+                                                // http://activemq.org/Exclusive+Consumer
         sb.append("consumer.prefetch=10&"); // more:
         // http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html
         QUEUE_PROPERTIES = sb.toString();
@@ -106,7 +112,8 @@
         final List<AbstractQueueMessageConsumer> abstrConsumers = new FastTable<AbstractQueueMessageConsumer>();
         try {
             for (MessageConsumer msgCons : consumers.values()) {
-                abstrConsumers.add((AbstractQueueMessageConsumer) msgCons.getMessageListener());
+                abstrConsumers.add((AbstractQueueMessageConsumer) msgCons
+                        .getMessageListener());
             }
         } catch (JMSException je) {
             logger.fatal("", je);
@@ -117,7 +124,8 @@
     /**
      * @param queueName -
      *            name of queue that we are looking for
-     * @return Queue with specified name or null if there is no message with given name
+     * @return Queue with specified name or null if there is no message with
+     *         given name
      */
     public Queue getQueueByName(String queueName) {
         for (Queue queue : queues) {
@@ -126,8 +134,9 @@
                     return queue;
                 }
             } catch (JMSException je) {
-                logger.fatal("Unexpected exception while searching connection with name \""
-                        + queueName + "\"", je);
+                logger.fatal(
+                        "Unexpected exception while searching connection with name \""
+                                + queueName + "\"", je);
             }
         }
         logger.fatal("No connection with name \"" + queueName + "\" found !");
@@ -137,7 +146,8 @@
     /**
      * @param name -
      *            name of consumer that we are looking for
-     * @return Consumer with specified name or null if there is no message with given name
+     * @return Consumer with specified name or null if there is no message with
+     *         given name
      */
     public AbstractQueueMessageConsumer getConsumerByName(String name) {
         for (AbstractQueueMessageConsumer mc : getConsumersList()) {
@@ -187,14 +197,17 @@
         //
         // also if you know you are not going to reuse the Message object after
         // sending then disable copyMessageOnSend
-        // I was able to speed things up significantly by setting the asyncDispatch and
-        // useAsyncSend properties. Using the vm: transport also sped it up quite a
+        // I was able to speed things up significantly by setting the
+        // asyncDispatch and
+        // useAsyncSend properties. Using the vm: transport also sped it up
+        // quite a
         // bit.
     }
 
     private final Session getSession() throws JMSException {
         // try {
-        return connection.createSession(TRANSACTION_ENABLED, Session.AUTO_ACKNOWLEDGE);
+        return connection.createSession(TRANSACTION_ENABLED,
+                Session.AUTO_ACKNOWLEDGE);
         // } catch (JMSException je) {
         // logger.fatal("Fatal error while creating session", je);
         // throw new RuntimeException(je);
@@ -203,7 +216,8 @@
 
     protected Queue createQueue(String queueName) {
         final String queueNameWithProperties = queueName + QUEUE_PROPERTIES;
-        final Queue queue = new org.apache.activemq.command.ActiveMQQueue(queueNameWithProperties);
+        final Queue queue = new org.apache.activemq.command.ActiveMQQueue(
+                queueNameWithProperties);
         return queue;
     }
 
@@ -218,7 +232,8 @@
      * @throws RuntimeException
      *             if there is a problem with creating a connection
      */
-    public void associateNewConsumer(String queueName, AbstractQueueMessageConsumer consumer) {
+    public void associateNewConsumer(String queueName,
+            AbstractQueueMessageConsumer consumer) {
         Session session = null;
         try {
             ensureConnection();
@@ -228,12 +243,14 @@
             logger.debug("Session: " + session);
 
             final Queue queue = createQueue(queueName);
-            final MessageConsumer messageConsumer = session.createConsumer(queue);
+            final MessageConsumer messageConsumer = session
+                    .createConsumer(queue);
             consumer.setName(queueName);
             messageConsumer.setMessageListener(consumer);
             queues.add(queue);
             consumers.put(queueName, messageConsumer);
             sessions.put(queueName, session);
+            consumersList.add(messageConsumer);
         } catch (Throwable t) {
             logger.fatal("Fatal error while associating consumer \"" + consumer
                     + "\" with connection \"" + connection + "\"", t);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |