[Ubermq-commits] jms/src/com/ubermq/jms/client/proc ClientProc.java,1.12,1.12.2.1
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-10-08 19:50:08
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/proc In directory usw-pr-cvs1:/tmp/cvs-serv17612/src/com/ubermq/jms/client/proc Modified Files: Tag: ubermq-1-0 ClientProc.java Log Message: multicast fixes Index: ClientProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/proc/ClientProc.java,v retrieving revision 1.12 retrieving revision 1.12.2.1 diff -C2 -d -r1.12 -r1.12.2.1 *** ClientProc.java 26 Sep 2002 19:52:54 -0000 1.12 --- ClientProc.java 8 Oct 2002 19:50:05 -0000 1.12.2.1 *************** *** 1,13 **** package com.ubermq.jms.client.proc; - import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; import com.ubermq.jms.server.datagram.*; import com.ubermq.jms.server.routing.*; import com.ubermq.kernel.*; import com.ubermq.jms.client.ClientConfig; - import com.ubermq.jms.server.routing.impl.Router; import com.ubermq.kernel.overflow.ExponentialBackoff; - import java.nio.ByteBuffer; - import java.util.*; /** --- 1,13 ---- package com.ubermq.jms.client.proc; import com.ubermq.jms.server.datagram.*; import com.ubermq.jms.server.routing.*; + import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; + import java.util.*; + + import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; import com.ubermq.jms.client.ClientConfig; import com.ubermq.kernel.overflow.ExponentialBackoff; /** *************** *** 19,341 **** implements com.ubermq.jms.client.IClientProcessor { ! /** ! * Our routers, one for subscriptions, one for ack's. ! */ private final IConfigurableRouter subRouter = new Router(), ! ackRouter = new Router(); ! ! /** ! * Keep a log of control datagrams that we sent in case ! * we reconnect and need to resume our state. ! */ ! private List replay; ! ! /** ! * The connection we are managing. ! */ protected IConnectionInfo managedConn; ! ! /** ! * The overflow handler to be used when sending ! * control datagrams. ! */ protected IOverflowHandler controlHandler; ! ! /** ! * The factory for creating control datagrams. ! */ ! protected IControlDatagramFactory controlFactory; ! ! /** ! * Subclasses can change this value to indicate whether the ! * main logic should send control datagrams. ! */ protected boolean fSendControlDgrams = true; ! private static final boolean SHOULD_WAIT_FOR_ACK = ! Boolean.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_SHOULD_WAIT_FOR_ACK, "false")).booleanValue(); private static final int RPC_TIMEOUT = 2000; // 2seconds for an RPC ack to come back. ! /** ! * Creates a client processor with the given control datagram factory ! * to be used to create control datagrams when appropriate. ! * @param controlFactory a control datagram factory ! */ public ClientProc(IControlDatagramFactory controlFactory) { ! this.controlFactory = controlFactory; ! this.controlHandler = new ExponentialBackoff( ! Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_INITIAL_TIMEOUT, "50")).longValue(), ! Integer.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_BACKOFF_MULTIPLIER, "2")).intValue(), ! Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_MAXIMUM_TIMEOUT, "5000")).longValue(), ! false); ! this.replay = new LinkedList(); } ! public void accept(IConnectionInfo conn) { ! this.managedConn = conn; } ! public void remove(IConnectionInfo conn) { ! this.managedConn = null; ! ! // not valid here. } ! ! /** ! * The connection has been reconnected. This indicates that the client ! * processor should restore any server-side state, as necessary. ! */ ! public void reconnected() ! { ! // replay any state that needs to be recreated on the server-side. ! Iterator iter = replay.iterator(); ! while (iter.hasNext()) ! { ! IControlDatagram cd = (IControlDatagram)iter.next(); ! controlSequence(cd, controlHandler, false); ! } ! } ! public void process(IConnectionInfo conn, IDatagram d) { ! try { ! // now we'll process it. we only care about ! // ACK datagrams (for publishers) and ! // MSG datagrams (for subscribers) ! if (d instanceof IAckDatagram) { ! ack((IAckDatagram)d); ! } ! if (d instanceof IControlDatagram) { ! ctl((IControlDatagram)d); ! } ! if (d instanceof IMessageDatagram) { ! msg((IMessageDatagram)d); ! } ! } catch(Exception x) { ! com.ubermq.Utility.getLogger().throwing("", "", x); ! } } ! private void ack(IAckDatagram ad) { ! SourceSpec ss = new MessageIdSourceSpec(ad.getAckMessageId()); ! ! Iterator acks = ackRouter.getRoutes(ss); ! if (!acks.hasNext()) return; ! ! EndpointDestNode edn = ((EndpointDestNode)acks.next()); ! edn.getEndpoint().deliver(ad); ! ! // remove the ack request. ! ackRouter.remove(ss, edn); } ! private void msg(IMessageDatagram md) { ! for(Iterator acks = subRouter.getRoutes(md.getTopicName());acks.hasNext();) ! { ! EndpointDestNode edn = ((EndpointDestNode)acks.next()); ! edn.getEndpoint().deliver(md); ! } } ! private void ctl(IControlDatagram cd) { ! // does not make sense for client-side connections ! // to be receiving control datagrams in this implementation. } ! public void registerSubscription(String spec, ! String selector, ! IDatagramEndpoint e) { ! SourceSpec ss = new TopicSourceSpec(spec); ! ! // check if there is an endpoint already registered here. ! boolean alreadyRegistered = internalRegister(ss, e); ! ! // tell the server that I care about this ! // subscription, if needed ! if (!alreadyRegistered && fSendControlDgrams) ! controlSequence(controlFactory.subscribe(spec, selector), controlHandler); } ! ! private boolean internalRegister(SourceSpec ss, IDatagramEndpoint e) { ! boolean alreadyRegistered = ! (subRouter.getRoutes(ss).hasNext()); ! ! // register with our internal router ! RouteDestNode rdn = new EndpointDestNode(e); ! subRouter.addKnownNode(rdn); ! subRouter.addRoute(ss, rdn); ! return alreadyRegistered; } ! public void registerDurableSubscription(String spec, ! String name, ! String selector, ! IDatagramEndpoint e) { ! SourceSpec ss = new TopicSourceSpec(spec); ! ! // check if there is an endpoint already registered here. ! boolean alreadyRegistered = internalRegister(ss, e); ! ! // subscribe durably ! // and recover messages from my absence. ! if (!alreadyRegistered && fSendControlDgrams) { ! controlSequence(controlFactory.durableSubscribe(name, spec, selector), controlHandler); ! controlSequence(controlFactory.durableRecover(name), controlHandler); ! } ! } ! ! public void unregisterSubscription(String spec, ! IDatagramEndpoint e) { ! SourceSpec ss = new TopicSourceSpec(spec); ! RouteDestNode rdn = new EndpointDestNode(e); ! subRouter.remove(ss, rdn); ! ! // tell server that i no longer care if there are no more in the ! // internal router. ! if (!(subRouter.getRoutes(ss).hasNext()) && fSendControlDgrams) ! { ! controlSequence(controlFactory.unsubscribe(spec), controlHandler); ! } } ! public void unregisterDurableSubscription(String name) { ! if (fSendControlDgrams) ! { ! controlSequence(controlFactory.durableUnsubscribe(name), controlHandler); ! } } ! public void durableGoingAway(String name) { ! if (fSendControlDgrams) ! { ! controlSequence(controlFactory.durableGoingAway(name), controlHandler); ! } } ! public boolean shouldWaitForAck() { ! return SHOULD_WAIT_FOR_ACK; } ! public void registerNeedAck(MessageId msgId, ! IDatagramEndpoint e) { ! RouteDestNode rdn = new EndpointDestNode(e); ! ! ackRouter.addKnownNode(rdn); ! ackRouter.addRoute(new MessageIdSourceSpec(msgId), rdn); } ! ! public boolean controlSequence(IControlDatagram d, ! IOverflowHandler h) ! { ! return controlSequence(d, h, true); ! } ! ! private boolean controlSequence(IControlDatagram d, ! IOverflowHandler h, ! boolean saveInReplayLog) ! { ! // output the data gram ! managedConn.output(d, h); ! ! // save the datagram state in our replay history ! if (saveInReplayLog) ! replay.add(d); ! ! // wait for the RPC ack to come back. ! final Object mutex = new Object(); ! final SynchronizedBoolean b = new SynchronizedBoolean(false); ! ! registerNeedAck(new MessageId(0, 0), new IDatagramEndpoint() { ! public void deliver(IDatagram d) ! { ! synchronized(mutex) { ! mutex.notify(); ! } ! b.set(!((IAckDatagram)d).isNegativeAck()); ! } ! }); ! ! try ! { ! synchronized(mutex) { ! mutex.wait(RPC_TIMEOUT); ! } ! } ! catch (InterruptedException e) {} ! ! return b.get(); ! } ! private final static class TopicSourceSpec ! extends com.ubermq.jms.server.routing.impl.RegexpSourceSpec { ! public TopicSourceSpec(String spec) ! { ! super(com.ubermq.jms.server.routing.impl.RegexpHelper.xlat(spec), ! spec); ! } } ! private final static class MessageIdSourceSpec ! implements SourceSpec { ! MessageId msgId; ! ! public MessageIdSourceSpec(MessageId msgId) {this.msgId = msgId;} ! public boolean matches(SourceSpec ss) ! { ! try { ! return (msgId.equals(((MessageIdSourceSpec)ss).msgId)); ! } catch(ClassCastException x) {return false;} ! } ! public boolean isMoreSpecificThan(SourceSpec ss) {return false;} ! public String getDisplayName() {return msgId.toString();} ! public boolean shouldCacheResults() {return false;} ! ! public boolean equals(Object obj) ! { ! return (((MessageIdSourceSpec)obj).msgId.equals(msgId)); ! } ! public int hashCode() {return msgId.hashCode();} } ! private final static class EndpointDestNode ! implements RouteDestNode { ! IDatagramEndpoint e; ! ! public EndpointDestNode(IDatagramEndpoint e) {this.e = e;} ! public String getDisplayName() {return getNodeName();} ! public String getNodeName() {return e.toString();} ! public boolean equals(Object o) { ! try { ! return (e == ((EndpointDestNode)o).e); ! } catch(ClassCastException cce ) {return false;} ! } ! public int compareTo(Object o) { ! try { ! boolean gr = (e.hashCode() > ((EndpointDestNode)o).e.hashCode()); ! boolean eq = equals(o); ! return (eq) ? 0 : (gr ? 1 : -1); ! } catch(ClassCastException cce ) {return 1;} ! } ! public int hashCode() {return e.hashCode();} ! public IDatagramEndpoint getEndpoint() {return e;} } } --- 19,342 ---- implements com.ubermq.jms.client.IClientProcessor { ! /** ! * Our routers, one for subscriptions, one for ack's. ! */ private final IConfigurableRouter subRouter = new Router(), ! ackRouter = new Router(); ! ! /** ! * Keep a log of control datagrams that we sent in case ! * we reconnect and need to resume our state. ! */ ! private List replay; ! ! /** ! * The connection we are managing. ! */ protected IConnectionInfo managedConn; ! ! /** ! * The overflow handler to be used when sending ! * control datagrams. ! */ protected IOverflowHandler controlHandler; ! ! /** ! * The factory for creating control datagrams. ! */ ! protected IControlDatagramFactory controlFactory; ! ! /** ! * Subclasses can change this value to indicate whether the ! * main logic should send control datagrams. ! */ protected boolean fSendControlDgrams = true; ! private static final boolean SHOULD_WAIT_FOR_ACK = ! Boolean.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_SHOULD_WAIT_FOR_ACK, "false")).booleanValue(); private static final int RPC_TIMEOUT = 2000; // 2seconds for an RPC ack to come back. ! /** ! * Creates a client processor with the given control datagram factory ! * to be used to create control datagrams when appropriate. ! * @param controlFactory a control datagram factory ! */ public ClientProc(IControlDatagramFactory controlFactory) { ! this.controlFactory = controlFactory; ! this.controlHandler = new ExponentialBackoff( ! Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_INITIAL_TIMEOUT, "50")).longValue(), ! Integer.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_BACKOFF_MULTIPLIER, "2")).intValue(), ! Long.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_MAXIMUM_TIMEOUT, "5000")).longValue(), ! false); ! this.replay = new LinkedList(); } ! public void accept(IConnectionInfo conn) { ! this.managedConn = conn; } ! public void remove(IConnectionInfo conn) { ! this.managedConn = null; ! ! // not valid here. } ! ! /** ! * The connection has been reconnected. This indicates that the client ! * processor should restore any server-side state, as necessary. ! */ ! public void reconnected() ! { ! // replay any state that needs to be recreated on the server-side. ! Iterator iter = replay.iterator(); ! while (iter.hasNext()) ! { ! IControlDatagram cd = (IControlDatagram)iter.next(); ! controlSequence(cd, controlHandler, false); ! } ! } ! public void process(IConnectionInfo conn, IDatagram d) { ! try { ! // now we'll process it. we only care about ! // ACK datagrams (for publishers) and ! // MSG datagrams (for subscribers) ! if (d instanceof IAckDatagram) { ! ack((IAckDatagram)d); ! } ! if (d instanceof IControlDatagram) { ! ctl((IControlDatagram)d); ! } ! if (d instanceof IMessageDatagram) { ! msg((IMessageDatagram)d); ! } ! } catch(Exception x) { ! com.ubermq.Utility.getLogger().throwing("", "", x); ! } } ! private void ack(IAckDatagram ad) { ! SourceSpec ss = new MessageIdSourceSpec(ad.getAckMessageId()); ! ! Iterator acks = ackRouter.getRoutes(ss); ! if (!acks.hasNext()) return; ! ! EndpointDestNode edn = ((EndpointDestNode)acks.next()); ! edn.getEndpoint().deliver(ad); ! ! // remove the ack request. ! ackRouter.remove(ss, edn); } ! private void msg(IMessageDatagram md) { ! for(Iterator acks = subRouter.getRoutes(md.getTopicName());acks.hasNext();) ! { ! EndpointDestNode edn = ((EndpointDestNode)acks.next()); ! edn.getEndpoint().deliver(md); ! } } ! private void ctl(IControlDatagram cd) { ! // does not make sense for client-side connections ! // to be receiving control datagrams in this implementation. } ! public void registerSubscription(String spec, ! String selector, ! IDatagramEndpoint e) { ! SourceSpec ss = new TopicSourceSpec(spec); ! ! // check if there is an endpoint already registered here. ! boolean alreadyRegistered = internalRegister(ss, e); ! ! // tell the server that I care about this ! // subscription, if needed ! if (!alreadyRegistered && fSendControlDgrams) ! controlSequence(controlFactory.subscribe(spec, selector), controlHandler); } ! ! private boolean internalRegister(SourceSpec ss, ! IDatagramEndpoint e) { ! boolean alreadyRegistered = ! (subRouter.getRoutes(ss).hasNext()); ! ! // register with our internal router ! RouteDestNode rdn = new EndpointDestNode(e); ! subRouter.addKnownNode(rdn); ! subRouter.addRoute(ss, rdn); ! return alreadyRegistered; } ! public void registerDurableSubscription(String spec, ! String name, ! String selector, ! IDatagramEndpoint e) { ! SourceSpec ss = new TopicSourceSpec(spec); ! ! // check if there is an endpoint already registered here. ! boolean alreadyRegistered = internalRegister(ss, e); ! ! // subscribe durably ! // and recover messages from my absence. ! if (!alreadyRegistered && fSendControlDgrams) { ! controlSequence(controlFactory.durableSubscribe(name, spec, selector), controlHandler); ! controlSequence(controlFactory.durableRecover(name), controlHandler); ! } ! } ! ! public void unregisterSubscription(String spec, ! IDatagramEndpoint e) { ! SourceSpec ss = new TopicSourceSpec(spec); ! RouteDestNode rdn = new EndpointDestNode(e); ! subRouter.remove(ss, rdn); ! ! // tell server that i no longer care if there are no more in the ! // internal router. ! if (!(subRouter.getRoutes(ss).hasNext()) && fSendControlDgrams) ! { ! controlSequence(controlFactory.unsubscribe(spec), controlHandler); ! } } ! public void unregisterDurableSubscription(String name) { ! if (fSendControlDgrams) ! { ! controlSequence(controlFactory.durableUnsubscribe(name), controlHandler); ! } } ! public void durableGoingAway(String name) { ! if (fSendControlDgrams) ! { ! controlSequence(controlFactory.durableGoingAway(name), controlHandler); ! } } ! public boolean shouldWaitForAck() { ! return SHOULD_WAIT_FOR_ACK; } ! public void registerNeedAck(MessageId msgId, ! IDatagramEndpoint e) { ! RouteDestNode rdn = new EndpointDestNode(e); ! ! ackRouter.addKnownNode(rdn); ! ackRouter.addRoute(new MessageIdSourceSpec(msgId), rdn); } ! ! public boolean controlSequence(IControlDatagram d, ! IOverflowHandler h) ! { ! return controlSequence(d, h, true); ! } ! ! private boolean controlSequence(IControlDatagram d, ! IOverflowHandler h, ! boolean saveInReplayLog) ! { ! // output the data gram ! managedConn.output(d, h); ! ! // save the datagram state in our replay history ! if (saveInReplayLog) ! replay.add(d); ! ! // wait for the RPC ack to come back. ! final Object mutex = new Object(); ! final SynchronizedBoolean b = new SynchronizedBoolean(false); ! ! registerNeedAck(new MessageId(0, 0), new IDatagramEndpoint() { ! public void deliver(IDatagram d) ! { ! synchronized(mutex) { ! mutex.notify(); ! } ! b.set(!((IAckDatagram)d).isNegativeAck()); ! } ! }); ! ! try ! { ! synchronized(mutex) { ! mutex.wait(RPC_TIMEOUT); ! } ! } ! catch (InterruptedException e) {} ! ! return b.get(); ! } ! private final static class TopicSourceSpec ! extends com.ubermq.jms.server.routing.impl.RegexpSourceSpec { ! public TopicSourceSpec(String spec) ! { ! super(com.ubermq.jms.server.routing.impl.RegexpHelper.xlat(spec), ! spec); ! } } ! private final static class MessageIdSourceSpec ! implements SourceSpec { ! MessageId msgId; ! ! public MessageIdSourceSpec(MessageId msgId) {this.msgId = msgId;} ! public boolean matches(SourceSpec ss) ! { ! try { ! return (msgId.equals(((MessageIdSourceSpec)ss).msgId)); ! } catch(ClassCastException x) {return false;} ! } ! public boolean isMoreSpecificThan(SourceSpec ss) {return false;} ! public String getDisplayName() {return msgId.toString();} ! public boolean shouldCacheResults() {return false;} ! ! public boolean equals(Object obj) ! { ! return (((MessageIdSourceSpec)obj).msgId.equals(msgId)); ! } ! public int hashCode() {return msgId.hashCode();} } ! private final static class EndpointDestNode ! implements RouteDestNode { ! IDatagramEndpoint e; ! ! public EndpointDestNode(IDatagramEndpoint e) {this.e = e;} ! public String getDisplayName() {return getNodeName();} ! public String getNodeName() {return e.toString();} ! public boolean equals(Object o) { ! try { ! return (e == ((EndpointDestNode)o).e); ! } catch(ClassCastException cce ) {return false;} ! } ! public int compareTo(Object o) { ! try { ! boolean gr = (e.hashCode() > ((EndpointDestNode)o).e.hashCode()); ! boolean eq = equals(o); ! return (eq) ? 0 : (gr ? 1 : -1); ! } catch(ClassCastException cce ) {return 1;} ! } ! public int hashCode() {return e.hashCode();} ! public IDatagramEndpoint getEndpoint() {return e;} } } |