|
From: <jbo...@li...> - 2006-06-30 19:41:08
|
Author: estebanschifman
Date: 2006-06-30 15:41:00 -0400 (Fri, 30 Jun 2006)
New Revision: 4896
Added:
labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java
Modified:
labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java
Log:
Added a utility class to drain queues and/or topics
Modified: labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java 2006-06-30 19:15:02 UTC (rev 4895)
+++ labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java 2006-06-30 19:41:00 UTC (rev 4896)
@@ -37,10 +37,11 @@
Class[] oaTest =
{
- TestNotification.class
// TestParamsRepository.class
// ,TestPersonAddrPhone.class
// ,TestObjStore.class
+// ,TestNotification.class
+ DrainQueuesAndTopics.class
};
for (Class oCls : oaTest)
Added: labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java
===================================================================
--- labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java 2006-06-30 19:15:02 UTC (rev 4895)
+++ labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java 2006-06-30 19:41:00 UTC (rev 4896)
@@ -0,0 +1,92 @@
+package org.jboss.soa.esb.tests;
+
+import javax.jms.*;
+import javax.naming.*;
+
+import org.jboss.soa.esb.helpers.*;
+
+public class DrainQueuesAndTopics
+{
+ private static final int MAX_TIMES_NOMSG = 10;
+ private static final String JNDI_URL = "localhost";
+ private static final String JNDI_TYPE
+ = AppServerContext.SERVER_TYPE.jboss.toString();
+
+ private Context m_oCtx;
+
+ public DrainQueuesAndTopics() throws Exception
+ {
+ purgeQueueTopic("queue/A");
+ purgeQueueTopic("queue/B");
+ purgeQueueTopic("topic/testTopic");
+ } //__________________________________
+
+ public int purgeQueueTopic(String p_sName) throws Exception
+ {
+ int iRet = 0;
+ m_oCtx = AppServerContext.getServerContext(JNDI_TYPE,JNDI_URL);
+
+ System.out.println(p_sName);
+ MessageConsumer oCns = null;
+ switch(p_sName.toLowerCase().charAt(0))
+ { case 't' : oCns = getTopic(p_sName); break;
+ case 'q' : oCns = getQueue(p_sName); break;
+ }
+ if (null==oCns)
+ return 0;
+
+ for (int iNoRcv=0; iNoRcv < MAX_TIMES_NOMSG; )
+ { Message oMsg = oCns.receive(200);
+ if (null==oMsg)
+ { iNoRcv++;
+ System.out.print(".");
+ continue;
+ }
+ iRet++;
+ dumpMessage(oMsg);
+ iNoRcv = 0;
+ }
+ System.out.println();
+ return iRet;
+ } //__________________________________
+
+ void dumpMessage(Message pM)
+ {
+ System.out.println(pM);
+ } //__________________________________
+
+ QueueReceiver getQueue(String p_sJndi) throws Exception
+ {
+ QueueConnection oQconn = null;
+ QueueSession oQsess = null;
+ QueueConnectionFactory qcf = (QueueConnectionFactory) m_oCtx
+ .lookup("ConnectionFactory");
+
+ oQconn = qcf.createQueueConnection();
+ oQsess = oQconn.createQueueSession(false
+ ,QueueSession.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue oQueue
+ = (javax.jms.Queue) m_oCtx.lookup(p_sJndi);
+
+ QueueReceiver oRcv = oQsess.createReceiver(oQueue);
+ oQconn.start();
+ return oRcv;
+ } //__________________________________
+
+ TopicSubscriber getTopic(String p_sJndi) throws Exception
+ {
+ TopicConnection oTconn = null;
+ TopicSession oTsess = null;
+ TopicConnectionFactory qcf = (TopicConnectionFactory) m_oCtx
+ .lookup("ConnectionFactory");
+
+ oTconn = qcf.createTopicConnection();
+ oTsess = oTconn.createTopicSession(false
+ ,QueueSession.AUTO_ACKNOWLEDGE);
+ Topic oT = (Topic) m_oCtx.lookup(p_sJndi);
+
+ TopicSubscriber oRcv = oTsess.createSubscriber(oT);
+ oTconn.start();
+ return oRcv;
+ } //__________________________________
+} //____________________________________________________________________________
|