[Ubermq-commits] jms/src/com/ubermq/jms/client/impl LocalTopicSubscriber.java,1.14,1.15 Session.java
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-09-27 21:29:05
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory usw-pr-cvs1:/tmp/cvs-serv11700/src/com/ubermq/jms/client/impl Modified Files: LocalTopicSubscriber.java Session.java SimpleDeliveryManager.java Log Message: small bug fixes. also a new way to chain together overflow handlers. Index: LocalTopicSubscriber.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/LocalTopicSubscriber.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** LocalTopicSubscriber.java 20 Sep 2002 15:52:22 -0000 1.14 --- LocalTopicSubscriber.java 27 Sep 2002 21:29:02 -0000 1.15 *************** *** 227,231 **** { messageListener = listener; ! session.startDeliveryThread(); } --- 227,231 ---- { messageListener = listener; ! session.checkDeliveryThread(); } Index: Session.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/Session.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** Session.java 19 Sep 2002 17:30:31 -0000 1.9 --- Session.java 27 Sep 2002 21:29:02 -0000 1.10 *************** *** 40,44 **** // async consumer delivery queue ! private Channel deliveryQueue = new BoundedPriorityQueue(SESSION_BUFFER_SIZE); private PooledExecutor executor; --- 40,44 ---- // async consumer delivery queue ! private Channel deliveryQueue; private PooledExecutor executor; *************** *** 52,55 **** --- 52,58 ---- this.factory = f; this.ackMode = ackMode; + + BoundedPriorityQueue bpq = new BoundedPriorityQueue(SESSION_BUFFER_SIZE); + deliveryQueue = bpq; } *************** *** 157,160 **** --- 160,164 ---- { this.listener = listener; + checkDeliveryThread(); } *************** *** 168,171 **** --- 172,177 ---- * @param msg the message to deliver * @param listener the JMS message listener to deliver to + * @throws InterruptedException if the thread is interrupted while the + * delivery buffer is full. */ void asyncDelivery(final Message msg, *************** *** 174,178 **** { deliveryQueue.put(new DeliveryTask(msg, listener)); - checkDeliveryThread(); } --- 180,183 ---- *************** *** 222,226 **** * starts the delivery thread */ ! void startDeliveryThread() { executor = new PooledExecutor(deliveryQueue); --- 227,231 ---- * starts the delivery thread */ ! synchronized private void startDeliveryThread() { executor = new PooledExecutor(deliveryQueue); *************** *** 228,236 **** public Thread newThread(Runnable p0) { ! Thread t = new Thread(p0); t.setDaemon(false); return t; } }); executor.setKeepAliveTime(-1); executor.setMinimumPoolSize(1); --- 233,242 ---- public Thread newThread(Runnable p0) { ! Thread t = new Thread(p0, "Session Delivery Thread"); t.setDaemon(false); return t; } }); + executor.waitWhenBlocked(); executor.setKeepAliveTime(-1); executor.setMinimumPoolSize(1); *************** *** 242,246 **** * Ensures the delivery thread is running. Starts it if not. */ ! void checkDeliveryThread() { if (executor == null) --- 248,252 ---- * Ensures the delivery thread is running. Starts it if not. */ ! synchronized void checkDeliveryThread() { if (executor == null) *************** *** 251,255 **** * Stops the delivery thread. */ ! void stopDeliveryThread() { if (executor != null) { --- 257,261 ---- * Stops the delivery thread. */ ! synchronized void stopDeliveryThread() { if (executor != null) { *************** *** 261,265 **** * Pauses the session from delivering messages. */ ! void pause() { stopDeliveryThread(); --- 267,271 ---- * Pauses the session from delivering messages. */ ! synchronized void pause() { stopDeliveryThread(); *************** *** 269,275 **** * Resumes delivery of messages. */ ! void resume() { ! startDeliveryThread(); } } --- 275,281 ---- * Resumes delivery of messages. */ ! synchronized void resume() { ! checkDeliveryThread(); } } Index: SimpleDeliveryManager.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/SimpleDeliveryManager.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** SimpleDeliveryManager.java 25 Sep 2002 00:42:18 -0000 1.6 --- SimpleDeliveryManager.java 27 Sep 2002 21:29:02 -0000 1.7 *************** *** 27,30 **** --- 27,40 ---- private static final int UNSEQUENCED_FLUSH_INTERVAL = Integer.valueOf(Configurator.getProperty(ClientConfig.SUB_UNSEQUENCED_FLUSH_TIMEOUT, "750")).intValue(); + static { + cd.setThreadFactory(new ThreadFactory() { + public Thread newThread(Runnable command) + { + Thread t = new Thread(command, "SimpleDeliveryManager Resequencing Thread"); + t.setDaemon(true); + return t; + } + }); + } /** |