[Ubermq-commits] jms/src/com/ubermq/jms/client/impl LocalTopicSubscriber.java,1.5,1.6 Session.java,1
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-08-23 14:06:58
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory usw-pr-cvs1:/tmp/cvs-serv27957/src/com/ubermq/jms/client/impl Modified Files: LocalTopicSubscriber.java Session.java TopicConnection.java TopicSession.java Log Message: tweaks to be more compliant with the JMS 1.0.2b spec, including: 1. TTL is honored by a new overflow handler 2. priorty is honored on the client receive side 3. JMSRedelivered flag is set at appropriate times 4. Connection.stop() waits for message processing to complete before returning to the caller. 5. Some standard message properties are stored in the flag DWORD for speed 6. RuntimeException thrown from MessageListener.onMessage() is handled in accordance with the spec. Index: LocalTopicSubscriber.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/LocalTopicSubscriber.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** LocalTopicSubscriber.java 15 Aug 2002 17:16:48 -0000 1.5 --- LocalTopicSubscriber.java 23 Aug 2002 14:06:53 -0000 1.6 *************** *** 3,8 **** import com.ubermq.kernel.*; import javax.jms.*; ! import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; import com.ubermq.jms.server.datagram.IMessageDatagram; import com.ubermq.jms.server.datagram.impl.AckDatagram; --- 3,9 ---- import com.ubermq.kernel.*; import javax.jms.*; + import java.util.*; ! import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.server.datagram.IMessageDatagram; import com.ubermq.jms.server.datagram.impl.AckDatagram; *************** *** 76,82 **** private IDeliveryManager delivery; ! private BoundedBuffer receiveQueue = new BoundedBuffer(BOUNDED_BUFFER_SIZE); private boolean isClosing = false; /** * Construct a non-durable TopicSubscriber to the specified topic --- 77,87 ---- private IDeliveryManager delivery; ! private Channel receiveQueue; private boolean isClosing = false; + private List pausedQueue; + private boolean isPaused = false; + + /** * Construct a non-durable TopicSubscriber to the specified topic *************** *** 122,125 **** --- 127,148 ---- this.durable = false; + // setup receive q with priority ordering. + receiveQueue = new BoundedPriorityQueue(BOUNDED_BUFFER_SIZE, + new Comparator() { + + public int compare(Object o1, Object o2) + { + try + { + int pri1 = ((Message)o1).getJMSPriority(), + pri2 = ((Message)o2).getJMSPriority(); + return pri1 - pri2; + } + catch (JMSException e) { + return 0; + } + } + }); + // register myself with the subscription router. proc.registerSubscription(t.getTopicName(), *************** *** 333,337 **** if (durable) { session.conn.output(new AckDatagram(md.getMessageId(), false), ! new ExponentialBackoff(50, 2, 1000, false)); } else { // we don't use acknolwedgements for non durable subscribers --- 356,360 ---- if (durable) { session.conn.output(new AckDatagram(md.getMessageId(), false), ! new ExponentialBackoff()); } else { // we don't use acknolwedgements for non durable subscribers *************** *** 364,367 **** --- 387,426 ---- } catch(Exception ie) { com.ubermq.Utility.getLogger().throwing("", "", ie); + } + } + + synchronized void pause() + { + if (!isPaused) + { + isPaused = true; + + // accumulate the list of things + pausedQueue = new ArrayList(BOUNDED_BUFFER_SIZE); + + Object o; + while((o = receiveQueue.peek()) != null) + { + pausedQueue.add(o); + } + } + } + + synchronized void resume() + { + if (isPaused) + { + isPaused = false; + + Iterator iter = pausedQueue.iterator(); + while (iter.hasNext()) + { + try + { + receiveQueue.offer(iter.next(), 0); + } + catch (InterruptedException e) {} + iter.remove(); + } } } Index: Session.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/Session.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** Session.java 22 Aug 2002 15:15:07 -0000 1.4 --- Session.java 23 Aug 2002 14:06:53 -0000 1.5 *************** *** 17,27 **** protected MessageListener listener; ! // This variable keeps track if this session is closing or not. private boolean isClosing = false; // async consumer delivery queue ! private BoundedBuffer deliveryQueue = new BoundedBuffer(SESSION_BUFFER_SIZE); private PooledExecutor executor; // impl public BytesMessage createBytesMessage() --- 17,34 ---- protected MessageListener listener; ! protected final boolean transacted; ! protected final int ackMode; private boolean isClosing = false; // async consumer delivery queue ! private Channel deliveryQueue = new BoundedPriorityQueue(SESSION_BUFFER_SIZE); private PooledExecutor executor; + Session(boolean transacted, int ackMode) + { + this.transacted = transacted; + this.ackMode = ackMode; + } + // impl public BytesMessage createBytesMessage() *************** *** 109,113 **** throws JMSException { ! // TODO: unclear what this method is intended to accomplish } --- 116,125 ---- throws JMSException { ! // TODO: we don't currently support recovery... ! // that is because the server forgets about messages we have ! // not acknowledged unless you are durable. ! // the JMS spec says that you are supposed to be able to get unack'd messages ! // if you want... ! throw new JMSUnsupportedOperationException(); } *************** *** 131,140 **** throws InterruptedException { ! deliveryQueue.put(new Runnable() {public void run() { ! listener.onMessage(msg); ! }}); checkDeliveryThread(); } // This method will start the delivery thread. void startDeliveryThread() --- 143,192 ---- throws InterruptedException { ! deliveryQueue.put(new DeliveryTask(msg, listener)); checkDeliveryThread(); } + private class DeliveryTask + implements Runnable, Comparable + { + private Message msg; + private MessageListener listener; + + DeliveryTask(Message msg, MessageListener listener) { + this.msg = msg; + this.listener = listener; + } + + public int compareTo(Object o) + { + try + { + return msg.getJMSPriority() - ((DeliveryTask)o).msg.getJMSPriority(); + } + catch (JMSException e) { + return 0; + } + } + + public void run() { + try { + listener.onMessage(msg); + } catch(RuntimeException re) { + if (ackMode == Session.CLIENT_ACKNOWLEDGE) { + // according to JMS spec, ignore + ; + } else { + // for DUPS_OK and AUTO_ we try immediately + // one more time, and then move on. + try { + listener.onMessage(msg); + } catch(RuntimeException re2) { + ; // move on... the client is hosed prob. + } + } + } + } + } + // This method will start the delivery thread. void startDeliveryThread() *************** *** 163,170 **** // This method will stop the delivery thread void stopDeliveryThread() - throws JMSException { if (executor != null) executor.shutdownAfterProcessingCurrentlyQueuedTasks(); } } --- 215,231 ---- // This method will stop the delivery thread void stopDeliveryThread() { if (executor != null) executor.shutdownAfterProcessingCurrentlyQueuedTasks(); + } + + void pause() + { + stopDeliveryThread(); + } + + void resume() + { + startDeliveryThread(); } } Index: TopicConnection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/TopicConnection.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** TopicConnection.java 22 Aug 2002 15:06:01 -0000 1.3 --- TopicConnection.java 23 Aug 2002 14:06:53 -0000 1.4 *************** *** 1,9 **** package com.ubermq.jms.client.impl; import com.ubermq.jms.client.*; import javax.jms.*; import java.io.IOException; - import java.util.HashSet; - import java.util.Set; public class TopicConnection --- 1,9 ---- package com.ubermq.jms.client.impl; + import com.ubermq.jms.client.*; + import java.util.*; import javax.jms.*; import java.io.IOException; public class TopicConnection *************** *** 12,15 **** --- 12,16 ---- { private Set localSenders; + private List sessions; public TopicConnection(IClientSession theSession, *************** *** 22,25 **** --- 23,27 ---- super(theSession, clientProc, delivery, host, port); localSenders = new HashSet(); + sessions = new ArrayList(); } *************** *** 42,67 **** throw new JMSUnsupportedOperationException(); } ! public javax.jms.TopicSession createTopicSession(boolean transacted, ! int acknowledgeMode) throws JMSException { ! return new TopicSession(this, ! clientProc, ! delivery, ! transacted, ! acknowledgeMode); } void addLocalSender(long senderId) { localSenders.add(new Long(senderId)); } ! void removeLocalSender(long senderId) { localSenders.remove(new Long(senderId)); } ! boolean isSenderLocal(long senderId) { --- 44,96 ---- throw new JMSUnsupportedOperationException(); } ! public javax.jms.TopicSession createTopicSession(boolean transacted, ! int acknowledgeMode) throws JMSException { ! TopicSession ts = new TopicSession(this, ! clientProc, ! delivery, ! transacted, ! acknowledgeMode); ! sessions.add(ts); ! return ts; } + + public void stop() + { + // propagate the pause message to our sessions. + Iterator iter = sessions.iterator(); + while (iter.hasNext()) + { + com.ubermq.jms.client.impl.TopicSession ts = (com.ubermq.jms.client.impl.TopicSession)iter.next(); + ts.pause(); + } + super.stop(); + } + + public void start() + { + super.start(); + + Iterator iter = sessions.iterator(); + while (iter.hasNext()) + { + com.ubermq.jms.client.impl.TopicSession ts = (com.ubermq.jms.client.impl.TopicSession)iter.next(); + ts.resume(); + } + } + void addLocalSender(long senderId) { localSenders.add(new Long(senderId)); } ! void removeLocalSender(long senderId) { localSenders.remove(new Long(senderId)); } ! boolean isSenderLocal(long senderId) { Index: TopicSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/TopicSession.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** TopicSession.java 14 Aug 2002 20:45:57 -0000 1.3 --- TopicSession.java 23 Aug 2002 14:06:53 -0000 1.4 *************** *** 3,13 **** import com.ubermq.jms.client.*; import com.ubermq.kernel.*; import javax.jms.*; import com.ubermq.jms.server.datagram.IAckDatagram; import com.ubermq.jms.server.datagram.IMessageDatagram; ! import com.ubermq.jms.server.datagram.impl.DatagramFactory; import com.ubermq.kernel.overflow.DropIncoming; - import com.ubermq.kernel.overflow.ExponentialBackoff; public class TopicSession --- 3,13 ---- import com.ubermq.jms.client.*; import com.ubermq.kernel.*; + import java.util.*; import javax.jms.*; import com.ubermq.jms.server.datagram.IAckDatagram; import com.ubermq.jms.server.datagram.IMessageDatagram; ! import com.ubermq.jms.server.proc.TTLOverflowHandler; import com.ubermq.kernel.overflow.DropIncoming; public class TopicSession *************** *** 18,26 **** protected IClientProcessor proc; protected IDeliveryManager delivery; ! protected boolean transacted; ! protected int ackMode; private static final IOverflowHandler persistentHandler = ! new ExponentialBackoff( Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_INITIAL_TIMEOUT, "50")).longValue(), Integer.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_BACKOFF_MULTIPLIER, "2")).intValue(), --- 18,25 ---- protected IClientProcessor proc; protected IDeliveryManager delivery; ! private List subscribers; private static final IOverflowHandler persistentHandler = ! new TTLOverflowHandler( Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_INITIAL_TIMEOUT, "50")).longValue(), Integer.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_BACKOFF_MULTIPLIER, "2")).intValue(), *************** *** 51,59 **** int ackMode) { this.conn = conn; this.proc = proc; this.delivery = delivery; ! this.transacted = transacted; ! this.ackMode = ackMode; } --- 50,59 ---- int ackMode) { + super(transacted, ackMode); this.conn = conn; this.proc = proc; this.delivery = delivery; ! ! this.subscribers = new ArrayList(); } *************** *** 75,79 **** throws JMSException { ! return new LocalTopicSubscriber(topic, messageSelector, noLocal, this, proc, delivery); } --- 75,81 ---- throws JMSException { ! TopicSubscriber s = new LocalTopicSubscriber(topic, messageSelector, noLocal, this, proc, delivery); ! subscribers.add(s); ! return s; } *************** *** 92,96 **** throws JMSException { ! return new LocalTopicSubscriber(topic, messageSelector, noLocal, name, this, proc, delivery); } --- 94,100 ---- throws JMSException { ! TopicSubscriber s = new LocalTopicSubscriber(topic, messageSelector, noLocal, name, this, proc, delivery); ! subscribers.add(s); ! return s; } *************** *** 111,114 **** --- 115,140 ---- { return new LocalTopic("$TT" + java.lang.Math.random()); + } + + void pause() + { + super.pause(); + Iterator iter = subscribers.iterator(); + while (iter.hasNext()) + { + LocalTopicSubscriber lts = (LocalTopicSubscriber)iter.next(); + lts.pause(); + } + } + + void resume() + { + super.resume(); + Iterator iter = subscribers.iterator(); + while (iter.hasNext()) + { + LocalTopicSubscriber lts = (LocalTopicSubscriber)iter.next(); + lts.resume(); + } } |