Thread: [Comsuite-svn] SF.net SVN: comsuite: [205] trunk/code/CSMiddleware/src/org/commsuite/queue /JMSQueu
Brought to you by:
zduniak
|
From: <ma...@us...> - 2006-10-05 16:13:54
|
Revision: 205
http://svn.sourceforge.net/comsuite/?rev=205&view=rev
Author: marasm
Date: 2006-10-05 09:13:43 -0700 (Thu, 05 Oct 2006)
Log Message:
-----------
holding references to consumers
Modified Paths:
--------------
trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java
Modified: trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java
===================================================================
--- trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java 2006-10-05 13:48:41 UTC (rev 204)
+++ trunk/code/CSMiddleware/src/org/commsuite/queue/JMSQueueManager.java 2006-10-05 16:13:43 UTC (rev 205)
@@ -20,6 +20,7 @@
*/
package org.commsuite.queue;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -45,10 +46,14 @@
* @author Marcin Zduniak
* @author Rafał Malinowski
*/
-public abstract class JMSQueueManager implements IQueueManager, ExceptionListener {
+public abstract class JMSQueueManager implements IQueueManager,
+ ExceptionListener {
- private final static Logger logger = Logger.getLogger(JMSQueueManager.class);
+ private List<MessageConsumer> consumersList = new ArrayList<MessageConsumer>();
+ private final static Logger logger = Logger
+ .getLogger(JMSQueueManager.class);
+
protected final static String QUEUE_PROPERTIES;
static {
// see: http://activemq.org/Destination+Options for all options
@@ -56,7 +61,8 @@
sb.append("?");
sb.append("consumer.dispatchAsync=true&"); // more:
// http://incubator.apache.org/activemq/consumer-dispatch-async.html
- sb.append("consumer.exclusive=true&"); // more: http://activemq.org/Exclusive+Consumer
+ sb.append("consumer.exclusive=true&"); // more:
+ // http://activemq.org/Exclusive+Consumer
sb.append("consumer.prefetch=10&"); // more:
// http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html
QUEUE_PROPERTIES = sb.toString();
@@ -106,7 +112,8 @@
final List<AbstractQueueMessageConsumer> abstrConsumers = new FastTable<AbstractQueueMessageConsumer>();
try {
for (MessageConsumer msgCons : consumers.values()) {
- abstrConsumers.add((AbstractQueueMessageConsumer) msgCons.getMessageListener());
+ abstrConsumers.add((AbstractQueueMessageConsumer) msgCons
+ .getMessageListener());
}
} catch (JMSException je) {
logger.fatal("", je);
@@ -117,7 +124,8 @@
/**
* @param queueName -
* name of queue that we are looking for
- * @return Queue with specified name or null if there is no message with given name
+ * @return Queue with specified name or null if there is no message with
+ * given name
*/
public Queue getQueueByName(String queueName) {
for (Queue queue : queues) {
@@ -126,8 +134,9 @@
return queue;
}
} catch (JMSException je) {
- logger.fatal("Unexpected exception while searching connection with name \""
- + queueName + "\"", je);
+ logger.fatal(
+ "Unexpected exception while searching connection with name \""
+ + queueName + "\"", je);
}
}
logger.fatal("No connection with name \"" + queueName + "\" found !");
@@ -137,7 +146,8 @@
/**
* @param name -
* name of consumer that we are looking for
- * @return Consumer with specified name or null if there is no message with given name
+ * @return Consumer with specified name or null if there is no message with
+ * given name
*/
public AbstractQueueMessageConsumer getConsumerByName(String name) {
for (AbstractQueueMessageConsumer mc : getConsumersList()) {
@@ -187,14 +197,17 @@
//
// also if you know you are not going to reuse the Message object after
// sending then disable copyMessageOnSend
- // I was able to speed things up significantly by setting the asyncDispatch and
- // useAsyncSend properties. Using the vm: transport also sped it up quite a
+ // I was able to speed things up significantly by setting the
+ // asyncDispatch and
+ // useAsyncSend properties. Using the vm: transport also sped it up
+ // quite a
// bit.
}
private final Session getSession() throws JMSException {
// try {
- return connection.createSession(TRANSACTION_ENABLED, Session.AUTO_ACKNOWLEDGE);
+ return connection.createSession(TRANSACTION_ENABLED,
+ Session.AUTO_ACKNOWLEDGE);
// } catch (JMSException je) {
// logger.fatal("Fatal error while creating session", je);
// throw new RuntimeException(je);
@@ -203,7 +216,8 @@
protected Queue createQueue(String queueName) {
final String queueNameWithProperties = queueName + QUEUE_PROPERTIES;
- final Queue queue = new org.apache.activemq.command.ActiveMQQueue(queueNameWithProperties);
+ final Queue queue = new org.apache.activemq.command.ActiveMQQueue(
+ queueNameWithProperties);
return queue;
}
@@ -218,7 +232,8 @@
* @throws RuntimeException
* if there is a problem with creating a connection
*/
- public void associateNewConsumer(String queueName, AbstractQueueMessageConsumer consumer) {
+ public void associateNewConsumer(String queueName,
+ AbstractQueueMessageConsumer consumer) {
Session session = null;
try {
ensureConnection();
@@ -228,12 +243,14 @@
logger.debug("Session: " + session);
final Queue queue = createQueue(queueName);
- final MessageConsumer messageConsumer = session.createConsumer(queue);
+ final MessageConsumer messageConsumer = session
+ .createConsumer(queue);
consumer.setName(queueName);
messageConsumer.setMessageListener(consumer);
queues.add(queue);
consumers.put(queueName, messageConsumer);
sessions.put(queueName, session);
+ consumersList.add(messageConsumer);
} catch (Throwable t) {
logger.fatal("Fatal error while associating consumer \"" + consumer
+ "\" with connection \"" + connection + "\"", t);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|