From: Ovidiu F. <ovi...@jb...> - 2005-05-17 22:40:53
|
User: ovidiu Date: 05/05/17 18:40:40 Modified: tests/src/org/jboss/test/messaging/jms MessageConsumerTest.java Log: Last check in before changing the Router interface (see http://www.jboss.org/index.html?module=bb&op=viewtopic&t=64056) Revision Changes Path 1.6 +92 -2 jboss-jms/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MessageConsumerTest.java =================================================================== RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -b -r1.5 -r1.6 --- MessageConsumerTest.java 10 May 2005 04:59:16 -0000 1.5 +++ MessageConsumerTest.java 17 May 2005 22:40:40 -0000 1.6 @@ -18,6 +18,7 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; +import javax.jms.JMSException; import javax.naming.InitialContext; import java.util.List; import java.util.Collections; @@ -26,7 +27,7 @@ /** * @author <a href="mailto:ov...@jb...">Ovidiu Feodorov</a> - * @version <tt>$Revision: 1.5 $</tt> + * @version <tt>$Revision: 1.6 $</tt> */ public class MessageConsumerTest extends MessagingTestCase { @@ -41,6 +42,8 @@ protected MessageProducer topicProducer, queueProducer; protected MessageConsumer topicConsumer, queueConsumer; + protected Thread worker1; + // Constructors -------------------------------------------------- public MessageConsumerTest(String name) @@ -80,6 +83,11 @@ producerConnection.close(); consumerConnection.close(); + if (worker1 != null) + { + worker1.interrupt(); + } + ServerManagement.undeployTopic("Topic"); ServerManagement.undeployQueue("Queue"); ServerManagement.stopInVMServer(); @@ -270,7 +278,9 @@ // // } - + // + // MessageListener tests + // public void testMessageListenerOnTopic() throws Exception { @@ -281,9 +291,68 @@ Message m1 = producerSession.createMessage(); topicProducer.send(m1); + + // block the current thread until the listener gets something; this is to avoid closing + // the connection too early + l.waitForMessages(); + assertEquals(m1.getJMSMessageID(), l.getNextMessage().getJMSMessageID()); } + public void testSetMessageListenerTwice() throws Exception + { + MessageListenerImpl listener1 = new MessageListenerImpl(); + topicConsumer.setMessageListener(listener1); + + MessageListenerImpl listener2 = new MessageListenerImpl(); + topicConsumer.setMessageListener(listener2); + + consumerConnection.start(); + + Message m1 = producerSession.createMessage(); + topicProducer.send(m1); + + // block the current thread until the listener gets something; this is to avoid closing + // the connection too early + listener2.waitForMessages(); + + assertEquals(m1.getJMSMessageID(), listener2.getNextMessage().getJMSMessageID()); + assertEquals(0, listener1.size()); + } + + public void testSetMessageListenerWhileReceiving() throws Exception + { + consumerConnection.start(); + worker1= new Thread(new Runnable() + { + public void run() + { + try + { + topicConsumer.receive(); + } + catch(Exception e) + { + e.printStackTrace(); + } + }}, "Receiver"); + + worker1.start(); + + Thread.sleep(100); + + try + { + topicConsumer.setMessageListener(new MessageListenerImpl()); + fail("should have thrown JMSException"); + } + catch(JMSException e) + { + // ok + log.info(e.getMessage()); + } + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -295,10 +364,26 @@ private class MessageListenerImpl implements MessageListener { private List messages = Collections.synchronizedList(new ArrayList()); + private Object mutex = new Object(); + + /** Blocks the calling thread until at least a message is received */ + public void waitForMessages() throws InterruptedException + { + synchronized(mutex) + { + mutex.wait(); + } + } public void onMessage(Message m) { messages.add(m); + log.info("Added message " + m + " to my list"); + + synchronized(mutex) + { + mutex.notify(); + } }; public Message getNextMessage() @@ -313,6 +398,11 @@ return m; } + public int size() + { + return messages.size(); + } + public void clear() { messages.clear(); |