[Ubermq-commits] jms/src/com/ubermq/jms/client/impl LocalTopicSubscriber.java,1.10,1.11 Session.java
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-09-17 15:21:24
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory usw-pr-cvs1:/tmp/cvs-serv24404/src/com/ubermq/jms/client/impl Modified Files: LocalTopicSubscriber.java Session.java Log Message: statistics gathering tester, plus some bug fixes Index: LocalTopicSubscriber.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/LocalTopicSubscriber.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** LocalTopicSubscriber.java 6 Sep 2002 15:55:57 -0000 1.10 --- LocalTopicSubscriber.java 17 Sep 2002 15:21:18 -0000 1.11 *************** *** 233,236 **** --- 233,237 ---- { messageListener = listener; + session.startDeliveryThread(); } Index: Session.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/Session.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** Session.java 11 Sep 2002 18:35:07 -0000 1.7 --- Session.java 17 Sep 2002 15:21:18 -0000 1.8 *************** *** 1,237 **** ! package com.ubermq.jms.client.impl; ! ! import EDU.oswego.cs.dl.util.concurrent.*; ! import com.ubermq.jms.client.*; ! import com.ubermq.jms.client.msg.*; ! import javax.jms.*; ! ! import com.ubermq.jms.server.datagram.IMessageDatagramFactory; ! import com.ubermq.kernel.Configurator; ! ! class Session ! implements javax.jms.Session ! { ! private static final int SESSION_BUFFER_SIZE = Integer.valueOf(Configurator.getProperty(ClientConfig.SESSION_BOUNDED_BUFFER_SIZE, "100")).intValue(); ! ! // message creation ! protected IMessageDatagramFactory factory; ! ! // not seemingly used. ! protected MessageListener listener; ! ! protected final boolean transacted; ! protected final int ackMode; ! private boolean isClosing = false; ! ! // async consumer delivery queue ! private Channel deliveryQueue = new BoundedPriorityQueue(SESSION_BUFFER_SIZE); ! private PooledExecutor executor; ! ! Session(IMessageDatagramFactory f, boolean transacted, int ackMode) ! { ! this.factory = f; ! this.transacted = transacted; ! this.ackMode = ackMode; ! } ! ! // impl ! public BytesMessage createBytesMessage() ! throws JMSException ! { ! return new LocalBytesMessage(factory); ! } ! ! public MapMessage createMapMessage() ! throws JMSException ! { ! return new LocalMapMessage(factory); ! } ! ! public Message createMessage() ! throws JMSException ! { ! return new LocalMessage(factory); ! } ! ! public ObjectMessage createObjectMessage() ! throws JMSException ! { ! return new LocalObjectMessage(factory); ! } ! ! public ObjectMessage createObjectMessage(java.io.Serializable object) ! throws JMSException ! { ! ObjectMessage om = createObjectMessage(); ! om.setObject(object); ! return om; ! } ! ! public StreamMessage createStreamMessage() ! throws JMSException ! { ! return new LocalStreamMessage(factory); ! } ! ! public TextMessage createTextMessage() ! throws JMSException ! { ! return new LocalTextMessage(factory); ! } ! ! public TextMessage createTextMessage(java.lang.String text) ! throws JMSException ! { ! TextMessage tm = createTextMessage(); ! tm.setText(text); ! return tm; ! } ! ! public boolean getTransacted() ! throws JMSException ! { ! return false; ! } ! ! public void commit() ! throws JMSException ! { ! // TODO: implement transactional?? ! // silently ignore this, because commit's are OK. ! } ! ! public void rollback() ! throws JMSException ! { ! // TODO: implement transactional?? ! // this won't work as anticipated, so throw unsupported operation. ! throw new JMSUnsupportedOperationException(); ! } ! ! boolean isClosing() {return isClosing;} ! ! public void close() throws JMSException ! { ! isClosing = true; ! stopDeliveryThread(); ! } ! ! public void recover() ! throws JMSException ! { ! // TODO: we don't currently support recovery... ! // that is because the server forgets about messages we have ! // not acknowledged unless you are durable. ! // the JMS spec says that you are supposed to be able to get unack'd messages ! // if you want... ! throw new JMSUnsupportedOperationException(); ! } ! ! public MessageListener getMessageListener() ! throws JMSException ! { ! return listener; ! } ! ! public void setMessageListener(MessageListener listener) ! throws JMSException ! { ! this.listener = listener; ! } ! ! public void run() ! { ! } ! ! void asyncDelivery(final Message msg, final MessageListener listener) ! throws InterruptedException ! { ! deliveryQueue.put(new DeliveryTask(msg, listener)); ! checkDeliveryThread(); ! } ! ! private class DeliveryTask ! implements Runnable, Comparable ! { ! private Message msg; ! private MessageListener listener; ! ! DeliveryTask(Message msg, MessageListener listener) { ! this.msg = msg; ! this.listener = listener; ! } ! ! public int compareTo(Object o) ! { ! try ! { ! return msg.getJMSPriority() - ((DeliveryTask)o).msg.getJMSPriority(); ! } ! catch (JMSException e) { ! return 0; ! } ! } ! ! public void run() { ! try { ! listener.onMessage(msg); ! } catch(RuntimeException re) { ! if (ackMode == Session.CLIENT_ACKNOWLEDGE) { ! // according to JMS spec, ignore ! ; ! } else { ! // for DUPS_OK and AUTO_ we try immediately ! // one more time, and then move on. ! try { ! listener.onMessage(msg); ! } catch(RuntimeException re2) { ! ; // move on... the client is hosed prob. ! } ! } ! } ! } ! } ! ! // This method will start the delivery thread. ! void startDeliveryThread() ! { ! executor = new PooledExecutor(deliveryQueue); ! executor.setThreadFactory( new ThreadFactory() { ! public Thread newThread(Runnable p0) ! { ! Thread t = new Thread(p0); ! t.setDaemon(true); ! return t; ! } ! }); ! executor.setKeepAliveTime(-1); ! executor.setMinimumPoolSize(1); ! executor.setMaximumPoolSize(1); ! executor.createThreads(1); ! } ! ! void checkDeliveryThread() ! { ! if (executor == null) ! startDeliveryThread(); ! } ! ! // This method will stop the delivery thread ! void stopDeliveryThread() ! { ! if (executor != null) { ! executor.shutdownNow(); ! } ! } ! ! void pause() ! { ! stopDeliveryThread(); ! } ! ! void resume() ! { ! startDeliveryThread(); ! } ! } ! --- 1,237 ---- ! package com.ubermq.jms.client.impl; ! ! import EDU.oswego.cs.dl.util.concurrent.*; ! import com.ubermq.jms.client.*; ! import com.ubermq.jms.client.msg.*; ! import javax.jms.*; ! ! import com.ubermq.jms.server.datagram.IMessageDatagramFactory; ! import com.ubermq.kernel.Configurator; ! ! class Session ! implements javax.jms.Session ! { ! private static final int SESSION_BUFFER_SIZE = Integer.valueOf(Configurator.getProperty(ClientConfig.SESSION_BOUNDED_BUFFER_SIZE, "100")).intValue(); ! ! // message creation ! protected IMessageDatagramFactory factory; ! ! // not seemingly used. ! protected MessageListener listener; ! ! protected final boolean transacted; ! protected final int ackMode; ! private boolean isClosing = false; ! ! // async consumer delivery queue ! private Channel deliveryQueue = new BoundedPriorityQueue(SESSION_BUFFER_SIZE); ! private PooledExecutor executor; ! ! Session(IMessageDatagramFactory f, boolean transacted, int ackMode) ! { ! this.factory = f; ! this.transacted = transacted; ! this.ackMode = ackMode; ! } ! ! // impl ! public BytesMessage createBytesMessage() ! throws JMSException ! { ! return new LocalBytesMessage(factory); ! } ! ! public MapMessage createMapMessage() ! throws JMSException ! { ! return new LocalMapMessage(factory); ! } ! ! public Message createMessage() ! throws JMSException ! { ! return new LocalMessage(factory); ! } ! ! public ObjectMessage createObjectMessage() ! throws JMSException ! { ! return new LocalObjectMessage(factory); ! } ! ! public ObjectMessage createObjectMessage(java.io.Serializable object) ! throws JMSException ! { ! ObjectMessage om = createObjectMessage(); ! om.setObject(object); ! return om; ! } ! ! public StreamMessage createStreamMessage() ! throws JMSException ! { ! return new LocalStreamMessage(factory); ! } ! ! public TextMessage createTextMessage() ! throws JMSException ! { ! return new LocalTextMessage(factory); ! } ! ! public TextMessage createTextMessage(java.lang.String text) ! throws JMSException ! { ! TextMessage tm = createTextMessage(); ! tm.setText(text); ! return tm; ! } ! ! public boolean getTransacted() ! throws JMSException ! { ! return false; ! } ! ! public void commit() ! throws JMSException ! { ! // TODO: implement transactional?? ! // silently ignore this, because commit's are OK. ! } ! ! public void rollback() ! throws JMSException ! { ! // TODO: implement transactional?? ! // this won't work as anticipated, so throw unsupported operation. ! throw new JMSUnsupportedOperationException(); ! } ! ! boolean isClosing() {return isClosing;} ! ! public void close() throws JMSException ! { ! isClosing = true; ! stopDeliveryThread(); ! } ! ! public void recover() ! throws JMSException ! { ! // TODO: we don't currently support recovery... ! // that is because the server forgets about messages we have ! // not acknowledged unless you are durable. ! // the JMS spec says that you are supposed to be able to get unack'd messages ! // if you want... ! throw new JMSUnsupportedOperationException(); ! } ! ! public MessageListener getMessageListener() ! throws JMSException ! { ! return listener; ! } ! ! public void setMessageListener(MessageListener listener) ! throws JMSException ! { ! this.listener = listener; ! } ! ! public void run() ! { ! } ! ! void asyncDelivery(final Message msg, final MessageListener listener) ! throws InterruptedException ! { ! deliveryQueue.put(new DeliveryTask(msg, listener)); ! checkDeliveryThread(); ! } ! ! private class DeliveryTask ! implements Runnable, Comparable ! { ! private Message msg; ! private MessageListener listener; ! ! DeliveryTask(Message msg, MessageListener listener) { ! this.msg = msg; ! this.listener = listener; ! } ! ! public int compareTo(Object o) ! { ! try ! { ! return msg.getJMSPriority() - ((DeliveryTask)o).msg.getJMSPriority(); ! } ! catch (JMSException e) { ! return 0; ! } ! } ! ! public void run() { ! try { ! listener.onMessage(msg); ! } catch(RuntimeException re) { ! if (ackMode == Session.CLIENT_ACKNOWLEDGE) { ! // according to JMS spec, ignore ! ; ! } else { ! // for DUPS_OK and AUTO_ we try immediately ! // one more time, and then move on. ! try { ! listener.onMessage(msg); ! } catch(RuntimeException re2) { ! ; // move on... the client is hosed prob. ! } ! } ! } ! } ! } ! ! // This method will start the delivery thread. ! void startDeliveryThread() ! { ! executor = new PooledExecutor(deliveryQueue); ! executor.setThreadFactory( new ThreadFactory() { ! public Thread newThread(Runnable p0) ! { ! Thread t = new Thread(p0); ! t.setDaemon(false); ! return t; ! } ! }); ! executor.setKeepAliveTime(-1); ! executor.setMinimumPoolSize(1); ! executor.setMaximumPoolSize(1); ! executor.createThreads(1); ! } ! ! void checkDeliveryThread() ! { ! if (executor == null) ! startDeliveryThread(); ! } ! ! // This method will stop the delivery thread ! void stopDeliveryThread() ! { ! if (executor != null) { ! executor.shutdownNow(); ! } ! } ! ! void pause() ! { ! stopDeliveryThread(); ! } ! ! void resume() ! { ! startDeliveryThread(); ! } ! } ! |