[Ubermq-commits] jms/src/com/ubermq/jms/server/proc DatagramProc.java,1.32,1.33
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-12-12 18:01:53
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc In directory sc8-pr-cvs1:/tmp/cvs-serv797/src/com/ubermq/jms/server/proc Modified Files: DatagramProc.java Log Message: JMS 1.1 support (full backwards compatibility) queue support (incomplete - functional, but w/o administrative capabilities) Index: DatagramProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/DatagramProc.java,v retrieving revision 1.32 retrieving revision 1.33 diff -C2 -d -r1.32 -r1.33 *** DatagramProc.java 7 Dec 2002 20:21:09 -0000 1.32 --- DatagramProc.java 12 Dec 2002 18:01:50 -0000 1.33 *************** *** 1,4 **** --- 1,5 ---- package com.ubermq.jms.server.proc; + import com.ubermq.jms.client.impl.*; import com.ubermq.jms.server.*; import com.ubermq.jms.server.admin.*; *************** *** 35,38 **** --- 36,42 ---- private static final String OVERFLOW_HANDLER_INIT = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER_INIT, ""); + // the prefix for queue subscription names + private static final String QUEUE_NAME_PREFIX = "$queue."; + // member variables. private ISettingsRepository journal; *************** *** 218,221 **** --- 222,246 ---- new ConnectionDestNode(conn)); } + else if (command instanceof IQueueStartDatagram) + { + IQueueStartDatagram iqs = ((IQueueStartDatagram)command); + setupDurableProxy(conn, + new RoundRobinArbiter(), + QUEUE_NAME_PREFIX + iqs.getQueueName(), + LocalQueue.getTopicForQueueName(iqs.getQueueName()).getTopicName()); + } + else if (command instanceof IQueueStopDatagram) + { + IQueueStopDatagram iqs = ((IQueueStopDatagram)command); + + DurableSubscriptionProxy proxy = getDurableSubscriber(QUEUE_NAME_PREFIX + iqs.getQueueName()); + if (proxy != null) { + // remove the proxy from the ack table. + unregisterDurableSubscriber(proxy); + + // go to disconnected mode on this connection. + proxy.disconnect(new ConnectionDestNode(conn)); + } + } else if (command instanceof IDurableSubscribeDatagram) { *************** *** 231,250 **** IDurableUnsubscribeDatagram sd = ((IDurableUnsubscribeDatagram)command); ! DurableSubscriptionProxy proxy = getDurableSubscriber(sd.getSubscriptionName()); ! if (proxy != null) { ! // remove the proxy from the ack table and from ! // the subscription table and from the list ! // of all durables. ! unregisterDurableSubscriber(proxy); ! removeDurableSubscriber(proxy); ! ! try ! { ! proxy.close(); ! } ! finally { ! proxy.unsubscribe(); ! } ! } } else if (command instanceof IDurableRecoverDatagram) --- 256,260 ---- IDurableUnsubscribeDatagram sd = ((IDurableUnsubscribeDatagram)command); ! unsubscribeDurableSubscriber(sd.getSubscriptionName()); } else if (command instanceof IDurableRecoverDatagram) *************** *** 475,478 **** --- 485,512 ---- { return (DurableSubscriptionProxy)durableSubscribers.get(name); + } + + /** + * Unsubscribes an existing durable subscriber. + */ + private void unsubscribeDurableSubscriber(String name) + { + DurableSubscriptionProxy proxy = getDurableSubscriber(name); + if (proxy != null) { + // remove the proxy from the ack table and from + // the subscription table and from the list + // of all durables. + unregisterDurableSubscriber(proxy); + removeDurableSubscriber(proxy); + + try + { + proxy.close(); + } + finally { + proxy.unsubscribe(); + } + } + } |