From: <jbo...@li...> - 2006-06-30 19:41:08
|
Author: estebanschifman Date: 2006-06-30 15:41:00 -0400 (Fri, 30 Jun 2006) New Revision: 4896 Added: labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java Modified: labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java Log: Added a utility class to drain queues and/or topics Modified: labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java =================================================================== --- labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java 2006-06-30 19:15:02 UTC (rev 4895) +++ labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/Controller.java 2006-06-30 19:41:00 UTC (rev 4896) @@ -37,10 +37,11 @@ Class[] oaTest = { - TestNotification.class // TestParamsRepository.class // ,TestPersonAddrPhone.class // ,TestObjStore.class +// ,TestNotification.class + DrainQueuesAndTopics.class }; for (Class oCls : oaTest) Added: labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java =================================================================== --- labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java 2006-06-30 19:15:02 UTC (rev 4895) +++ labs/jbossesb/trunk/ESBCore/Tests/src/org/jboss/soa/esb/tests/DrainQueuesAndTopics.java 2006-06-30 19:41:00 UTC (rev 4896) @@ -0,0 +1,92 @@ +package org.jboss.soa.esb.tests; + +import javax.jms.*; +import javax.naming.*; + +import org.jboss.soa.esb.helpers.*; + +public class DrainQueuesAndTopics +{ + private static final int MAX_TIMES_NOMSG = 10; + private static final String JNDI_URL = "localhost"; + private static final String JNDI_TYPE + = AppServerContext.SERVER_TYPE.jboss.toString(); + + private Context m_oCtx; + + public DrainQueuesAndTopics() throws Exception + { + purgeQueueTopic("queue/A"); + purgeQueueTopic("queue/B"); + purgeQueueTopic("topic/testTopic"); + } //__________________________________ + + public int purgeQueueTopic(String p_sName) throws Exception + { + int iRet = 0; + m_oCtx = AppServerContext.getServerContext(JNDI_TYPE,JNDI_URL); + + System.out.println(p_sName); + MessageConsumer oCns = null; + switch(p_sName.toLowerCase().charAt(0)) + { case 't' : oCns = getTopic(p_sName); break; + case 'q' : oCns = getQueue(p_sName); break; + } + if (null==oCns) + return 0; + + for (int iNoRcv=0; iNoRcv < MAX_TIMES_NOMSG; ) + { Message oMsg = oCns.receive(200); + if (null==oMsg) + { iNoRcv++; + System.out.print("."); + continue; + } + iRet++; + dumpMessage(oMsg); + iNoRcv = 0; + } + System.out.println(); + return iRet; + } //__________________________________ + + void dumpMessage(Message pM) + { + System.out.println(pM); + } //__________________________________ + + QueueReceiver getQueue(String p_sJndi) throws Exception + { + QueueConnection oQconn = null; + QueueSession oQsess = null; + QueueConnectionFactory qcf = (QueueConnectionFactory) m_oCtx + .lookup("ConnectionFactory"); + + oQconn = qcf.createQueueConnection(); + oQsess = oQconn.createQueueSession(false + ,QueueSession.AUTO_ACKNOWLEDGE); + javax.jms.Queue oQueue + = (javax.jms.Queue) m_oCtx.lookup(p_sJndi); + + QueueReceiver oRcv = oQsess.createReceiver(oQueue); + oQconn.start(); + return oRcv; + } //__________________________________ + + TopicSubscriber getTopic(String p_sJndi) throws Exception + { + TopicConnection oTconn = null; + TopicSession oTsess = null; + TopicConnectionFactory qcf = (TopicConnectionFactory) m_oCtx + .lookup("ConnectionFactory"); + + oTconn = qcf.createTopicConnection(); + oTsess = oTconn.createTopicSession(false + ,QueueSession.AUTO_ACKNOWLEDGE); + Topic oT = (Topic) m_oCtx.lookup(p_sJndi); + + TopicSubscriber oRcv = oTsess.createSubscriber(oT); + oTconn.start(); + return oRcv; + } //__________________________________ +} //____________________________________________________________________________ |