ubermq-commits Mailing List for UberMQ
Brought to you by:
jimmyp
You can subscribe to this list here.
2002 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(43) |
Sep
(204) |
Oct
(178) |
Nov
(38) |
Dec
(67) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2003 |
Jan
(63) |
Feb
(11) |
Mar
(5) |
Apr
(1) |
May
(8) |
Jun
(29) |
Jul
(3) |
Aug
(3) |
Sep
(5) |
Oct
|
Nov
|
Dec
|
2004 |
Jan
(64) |
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <ji...@us...> - 2004-01-21 02:05:58
|
Update of /cvsroot/ubermq/jms In directory sc8-pr-cvs1:/tmp/cvs-serv28341 Modified Files: build.xml README Added Files: .project Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: .project --- <?xml version="1.0" encoding="UTF-8"?> <projectDescription> <name>ubermq</name> <comment></comment> <projects> </projects> <buildSpec> <buildCommand> <name>org.eclipse.jdt.core.javabuilder</name> <arguments> </arguments> </buildCommand> </buildSpec> <natures> <nature>org.eclipse.jdt.core.javanature</nature> </natures> </projectDescription> Index: build.xml =================================================================== RCS file: /cvsroot/ubermq/jms/build.xml,v retrieving revision 1.25 retrieving revision 1.26 diff -C2 -d -r1.25 -r1.26 *** build.xml 16 Jun 2003 14:57:52 -0000 1.25 --- build.xml 21 Jan 2004 02:05:25 -0000 1.26 *************** *** 43,46 **** --- 43,53 ---- <target name="compile" depends="init"> + <copy todir="${build}"> + <fileset dir="${src}" includes="*.properties"/> + </copy> + <copy todir="${build}/images"> + <fileset dir="${images}"/> + </copy> + <javac srcdir="${src}" destdir="${build}" *************** *** 57,70 **** classname="com.ubermq.jms.server.MessageServer" dir="${bin}" ! fork="true"/> </target> <target name="dist" depends="compile"> - <copy todir="${build}/images"> - <fileset dir="${images}"/> - </copy> <jar jarfile="${dist}/${ant.project.name}.jar" basedir="${build}"> <fileset dir="${build}" includes="*"/> ! <zipgroupfileset dir="${lib}" includes="concurrent.jar,lrmp.jar"/> </jar> </target> --- 64,82 ---- classname="com.ubermq.jms.server.MessageServer" dir="${bin}" ! fork="true"> ! <jvmarg line="-Dlog4j.configuration=log4j-debug.properties"/> ! </java> </target> <target name="dist" depends="compile"> <jar jarfile="${dist}/${ant.project.name}.jar" basedir="${build}"> <fileset dir="${build}" includes="*"/> ! <zipgroupfileset dir="${lib}" includes="*.jar"/> ! </jar> ! <jar jarfile="${dist}/${ant.project.name}-client.jar" basedir="${build}"> ! <include name="**/*.class"/> ! <exclude name="**/ui/**"/> ! <exclude name="**/server/**"/> ! <zipgroupfileset dir="${lib}" includes="lrmp.jar,concurrent.jar"/> </jar> </target> *************** *** 72,82 **** <target name="test" depends="dist"> <junit printsummary="withOutAndErr" ! haltonfailure="no"> <classpath> <pathelement location="${dist}/${ant.project.name}.jar"/> ! <fileset dir="lib"> ! <include name="junit.jar"/> ! </fileset> ! </classpath> <batchtest> --- 84,97 ---- <target name="test" depends="dist"> <junit printsummary="withOutAndErr" ! haltonfailure="no" ! showoutput="yes" ! fork="no"> ! <jvmarg value="-Dlog4j.configuration=log4j-debug.properties"/> <classpath> <pathelement location="${dist}/${ant.project.name}.jar"/> ! <fileset dir="lib"> ! <include name="junit.jar"/> ! </fileset> ! </classpath> <batchtest> Index: README =================================================================== RCS file: /cvsroot/ubermq/jms/README,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** README 12 Mar 2003 14:44:46 -0000 1.14 --- README 21 Jan 2004 02:05:25 -0000 1.15 *************** *** 1,3 **** ! UberMQ 2.1 ----------- January 31, 2003 --- 1,3 ---- ! UberMQ 2.x ----------- January 31, 2003 |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client/msg Modified Files: LocalMapMessage.java LocalStreamMessage.java LocalTextMessage.java LocalBytesMessage.java AbstractStreamMessage.java LocalObjectMessage.java Log Message: bug fixes related to read/write initialization race conditions Index: LocalMapMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg/LocalMapMessage.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** LocalMapMessage.java 30 Jan 2003 14:42:18 -0000 1.6 --- LocalMapMessage.java 21 Jan 2004 02:05:25 -0000 1.7 *************** *** 3,7 **** import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.server.datagram.*; import java.util.*; import javax.jms.*; --- 3,7 ---- import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.common.datagram.*; import java.util.*; import javax.jms.*; Index: LocalStreamMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg/LocalStreamMessage.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** LocalStreamMessage.java 30 Jan 2003 14:42:19 -0000 1.6 --- LocalStreamMessage.java 21 Jan 2004 02:05:25 -0000 1.7 *************** *** 2,6 **** import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; import java.io.*; import javax.jms.*; --- 2,6 ---- import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; import java.io.*; import javax.jms.*; Index: LocalTextMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg/LocalTextMessage.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** LocalTextMessage.java 7 May 2003 17:41:46 -0000 1.9 --- LocalTextMessage.java 21 Jan 2004 02:05:25 -0000 1.10 *************** *** 1,9 **** package com.ubermq.jms.client.msg; - import com.ubermq.Utility; import com.ubermq.jms.client.IAcknowledgeHandler; import com.ubermq.jms.client.impl.LocalMessage; ! import com.ubermq.jms.server.datagram.IMessageDatagram; ! import com.ubermq.jms.server.datagram.IMessageDatagramFactory; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; --- 1,10 ---- package com.ubermq.jms.client.msg; import com.ubermq.jms.client.IAcknowledgeHandler; import com.ubermq.jms.client.impl.LocalMessage; ! import com.ubermq.jms.common.datagram.IMessageDatagram; ! import com.ubermq.jms.common.datagram.IMessageDatagramFactory; ! import com.ubermq.util.*; ! import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; Index: LocalBytesMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg/LocalBytesMessage.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** LocalBytesMessage.java 17 Dec 2002 14:52:24 -0000 1.6 --- LocalBytesMessage.java 21 Jan 2004 02:05:25 -0000 1.7 *************** *** 2,6 **** import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; /** --- 2,6 ---- import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; /** Index: AbstractStreamMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg/AbstractStreamMessage.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** AbstractStreamMessage.java 14 Feb 2003 13:33:22 -0000 1.9 --- AbstractStreamMessage.java 21 Jan 2004 02:05:25 -0000 1.10 *************** *** 4,8 **** import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.server.datagram.*; import java.io.*; import java.nio.*; --- 4,10 ---- import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.util.*; ! import java.io.*; import java.nio.*; Index: LocalObjectMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/msg/LocalObjectMessage.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** LocalObjectMessage.java 30 Jan 2003 14:42:19 -0000 1.9 --- LocalObjectMessage.java 21 Jan 2004 02:05:25 -0000 1.10 *************** *** 2,6 **** import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.server.datagram.*; import javax.jms.*; --- 2,6 ---- import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.common.datagram.*; import javax.jms.*; |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client/impl Modified Files: LocalMessage.java QueueReceiver.java QueueSender.java Session.java LocalTopicSubscriber.java TopicSession.java AbstractConsumer.java SimpleDeliveryManager.java AbstractClientSession.java AbstractProducer.java Connection.java QueueSession.java Log Message: bug fixes related to read/write initialization race conditions Index: LocalMessage.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/LocalMessage.java,v retrieving revision 1.19 retrieving revision 1.20 diff -C2 -d -r1.19 -r1.20 *** LocalMessage.java 7 May 2003 17:41:46 -0000 1.19 --- LocalMessage.java 21 Jan 2004 02:05:24 -0000 1.20 *************** *** 4,9 **** import com.ubermq.jms.client.*; import com.ubermq.jms.client.msg.*; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; import EDU.oswego.cs.dl.util.concurrent.*; import java.nio.*; --- 4,11 ---- import com.ubermq.jms.client.*; import com.ubermq.jms.client.msg.*; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; + import com.ubermq.util.*; + import EDU.oswego.cs.dl.util.concurrent.*; import java.nio.*; Index: QueueReceiver.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/QueueReceiver.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** QueueReceiver.java 19 Dec 2002 22:36:22 -0000 1.3 --- QueueReceiver.java 21 Jan 2004 02:05:24 -0000 1.4 *************** *** 3,7 **** import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; --- 3,7 ---- import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; Index: QueueSender.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/QueueSender.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** QueueSender.java 17 Dec 2002 14:52:24 -0000 1.2 --- QueueSender.java 21 Jan 2004 02:05:24 -0000 1.3 *************** *** 2,6 **** import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; import java.io.*; import java.util.*; --- 2,6 ---- import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; import java.io.*; import java.util.*; Index: Session.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/Session.java,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -d -r1.20 -r1.21 *** Session.java 12 Jan 2004 19:24:24 -0000 1.20 --- Session.java 21 Jan 2004 02:05:24 -0000 1.21 *************** *** 4,8 **** import com.ubermq.jms.client.*; import com.ubermq.jms.client.msg.*; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; import java.util.*; --- 4,8 ---- import com.ubermq.jms.client.*; import com.ubermq.jms.client.msg.*; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; import java.util.*; *************** *** 109,113 **** private String createRandomIdentifier() { ! return com.ubermq.Utility.allocateLocallyUniqueInt() + "-" + new Random().nextInt(); } --- 109,113 ---- private String createRandomIdentifier() { ! return com.ubermq.util.Utility.allocateLocallyUniqueInt() + "-" + new Random().nextInt(); } *************** *** 439,443 **** } } catch(RuntimeException re) { ! com.ubermq.Utility.getLogger().error("", re); if (ackMode == Session.CLIENT_ACKNOWLEDGE) { // according to JMS spec, ignore --- 439,443 ---- } } catch(RuntimeException re) { ! com.ubermq.util.Utility.getLogger().error("", re); if (ackMode == Session.CLIENT_ACKNOWLEDGE) { // according to JMS spec, ignore Index: LocalTopicSubscriber.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/LocalTopicSubscriber.java,v retrieving revision 1.27 retrieving revision 1.28 diff -C2 -d -r1.27 -r1.28 *** LocalTopicSubscriber.java 12 Jan 2004 19:24:24 -0000 1.27 --- LocalTopicSubscriber.java 21 Jan 2004 02:05:24 -0000 1.28 *************** *** 3,11 **** import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.routing.*; ! import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; import com.ubermq.*; import java.io.*; --- 3,12 ---- import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.routing.*; ! import com.ubermq.jms.common.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; + import com.ubermq.util.*; import com.ubermq.*; import java.io.*; Index: TopicSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/TopicSession.java,v retrieving revision 1.22 retrieving revision 1.23 diff -C2 -d -r1.22 -r1.23 *** TopicSession.java 12 Dec 2002 18:01:48 -0000 1.22 --- TopicSession.java 21 Jan 2004 02:05:24 -0000 1.23 *************** *** 1,15 **** package com.ubermq.jms.client.impl; - import com.ubermq.*; - import com.ubermq.jms.client.*; - import com.ubermq.jms.client.msg.*; - import com.ubermq.jms.server.datagram.*; - import com.ubermq.jms.server.proc.*; - import com.ubermq.kernel.*; - import com.ubermq.kernel.overflow.*; - import java.io.*; - import java.util.*; import javax.jms.*; /** * The topic session is a core JMS abstraction representing a set of publishers --- 1,9 ---- package com.ubermq.jms.client.impl; import javax.jms.*; + import com.ubermq.jms.common.datagram.*; + import com.ubermq.kernel.*; + /** * The topic session is a core JMS abstraction representing a set of publishers Index: AbstractConsumer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/AbstractConsumer.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** AbstractConsumer.java 12 Jan 2004 19:24:24 -0000 1.5 --- AbstractConsumer.java 21 Jan 2004 02:05:24 -0000 1.6 *************** *** 3,9 **** import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.routing.*; ! import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; --- 3,9 ---- import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.routing.*; ! import com.ubermq.jms.common.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; *************** *** 307,311 **** } } catch(Exception ie) { ! com.ubermq.Utility.getLogger().error("", ie); } } --- 307,311 ---- } } catch(Exception ie) { ! com.ubermq.util.Utility.getLogger().error("", ie); } } Index: SimpleDeliveryManager.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/SimpleDeliveryManager.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** SimpleDeliveryManager.java 12 Jan 2004 19:24:24 -0000 1.10 --- SimpleDeliveryManager.java 21 Jan 2004 02:05:24 -0000 1.11 *************** *** 3,7 **** import com.ubermq.jms.client.*; import com.ubermq.kernel.*; ! import com.ubermq.Utility; import java.util.*; import javax.jms.Message; --- 3,8 ---- import com.ubermq.jms.client.*; import com.ubermq.kernel.*; ! import com.ubermq.util.*; ! import java.util.*; import javax.jms.Message; Index: AbstractClientSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/AbstractClientSession.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** AbstractClientSession.java 12 Jan 2004 19:24:24 -0000 1.10 --- AbstractClientSession.java 21 Jan 2004 02:05:24 -0000 1.11 *************** *** 55,65 **** /** ! * Registers the connection with the I/o threads. */ public void addConnection(ConnectionInfo conn) throws IOException { ! read.register(conn); ! write.register(conn); } } --- 55,70 ---- /** ! * Registers the connection with the I/o threads. <P> ! * ! * JP note 1/20/04: It is <B>HIGHLY</b> important ! * to register the write thread prior to registering the read thread. Otherwise, ! * it is possible to run into situations where the write end is not prepared to respond to ! * incoming messages that are already in the connection buffer at the time of registration.<P> */ public void addConnection(ConnectionInfo conn) throws IOException { ! write.register(conn, true); ! read.register(conn, true); } } Index: AbstractProducer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/AbstractProducer.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** AbstractProducer.java 12 Jan 2004 19:24:24 -0000 1.7 --- AbstractProducer.java 21 Jan 2004 02:05:24 -0000 1.8 *************** *** 1,14 **** package com.ubermq.jms.client.impl; import com.ubermq.*; import com.ubermq.jms.client.*; import com.ubermq.jms.client.msg.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.proc.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; ! import java.io.*; ! import java.util.*; ! import javax.jms.*; /** --- 1,17 ---- package com.ubermq.jms.client.impl; + import java.io.*; + import java.util.*; + + import javax.jms.*; + import com.ubermq.*; import com.ubermq.jms.client.*; import com.ubermq.jms.client.msg.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.overflow.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; ! import com.ubermq.util.*; /** *************** *** 22,25 **** --- 25,31 ---- implements IDatagramEndpoint, javax.jms.MessageProducer { + private static final org.apache.log4j.Logger log = + org.apache.log4j.Logger.getLogger(AbstractProducer.class); + private final Destination d; private final Session s; *************** *** 80,84 **** { nextSequence = 0; ! senderId = com.ubermq.Utility.allocateGloballyUniqueLong(); s.conn.addLocalSender(senderId); } --- 86,90 ---- { nextSequence = 0; ! senderId = com.ubermq.util.Utility.allocateGloballyUniqueLong(); s.conn.addLocalSender(senderId); } *************** *** 316,319 **** --- 322,326 ---- // wait for it + log.debug(proc + " waiting for msg ACK " + waitingFor + ", " + MAX_ACK_WAIT_TIMEOUT + "ms"); this.wait(MAX_ACK_WAIT_TIMEOUT); *************** *** 321,327 **** // check that it is valid, and that it is not a NACK if (acknowledgement == null) ! throw new JMSUndeliverableException("Message was not acknowledged in the timeout period."); else if (acknowledgement.isNegativeAck()) ! throw new JMSUndeliverableException("Message was not successfully delivered."); } catch(InterruptedException ie) { --- 328,344 ---- // check that it is valid, and that it is not a NACK if (acknowledgement == null) ! { ! log.debug("no ack for " + waitingFor + " on " + proc); ! throw new JMSUndeliverableException("Message " + waitingFor + " was not acknowledged in the timeout period."); ! } else if (acknowledgement.isNegativeAck()) ! { ! log.debug("negative ack recv'd for " + waitingFor + " on " + proc); ! throw new JMSUndeliverableException("Message " + waitingFor + " was not successfully delivered."); ! } ! else ! { ! log.debug(waitingFor + " acknowledged"); ! } } catch(InterruptedException ie) { Index: Connection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/Connection.java,v retrieving revision 1.33 retrieving revision 1.34 diff -C2 -d -r1.33 -r1.34 *** Connection.java 12 Jan 2004 19:24:24 -0000 1.33 --- Connection.java 21 Jan 2004 02:05:24 -0000 1.34 *************** *** 3,10 **** import com.ubermq.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; import com.ubermq.kernel.event.*; import com.ubermq.kernel.overflow.*; import java.io.*; import java.util.*; --- 3,12 ---- import com.ubermq.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; import com.ubermq.kernel.event.*; import com.ubermq.kernel.overflow.*; + import com.ubermq.util.*; + import java.io.*; import java.util.*; *************** *** 28,32 **** // version information public static final int UBERMQ_MAJOR_VERSION = 2; ! public static final int UBERMQ_MINOR_VERSION = 4; public static final int UBERMQ_REVISION = 0; public static final String UBERMQ_PROVIDER_NAME = "UberMQ"; --- 30,34 ---- // version information public static final int UBERMQ_MAJOR_VERSION = 2; ! public static final int UBERMQ_MINOR_VERSION = 6; public static final int UBERMQ_REVISION = 0; public static final String UBERMQ_PROVIDER_NAME = "UberMQ"; *************** *** 329,333 **** void sendEvent(ConnectionEvent event) { ! com.ubermq.Utility.getLogger().debug("sending connection event " + event); Iterator iter = eventHandlers.iterator(); --- 331,335 ---- void sendEvent(ConnectionEvent event) { ! com.ubermq.util.Utility.getLogger().debug("sending connection event " + event); Iterator iter = eventHandlers.iterator(); Index: QueueSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/QueueSession.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** QueueSession.java 17 Dec 2002 14:52:24 -0000 1.2 --- QueueSession.java 21 Jan 2004 02:05:24 -0000 1.3 *************** *** 1,5 **** package com.ubermq.jms.client.impl; ! import com.ubermq.jms.server.datagram.*; final class QueueSession --- 1,5 ---- package com.ubermq.jms.client.impl; ! import com.ubermq.jms.common.datagram.*; final class QueueSession |
From: <ji...@us...> - 2004-01-21 02:05:58
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/admin In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/admin Modified Files: MessageServerAdmin.java RemoteAdminProxy.java Log Message: bug fixes related to read/write initialization race conditions Index: MessageServerAdmin.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/admin/MessageServerAdmin.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** MessageServerAdmin.java 19 Dec 2002 22:36:22 -0000 1.4 --- MessageServerAdmin.java 21 Jan 2004 02:05:24 -0000 1.5 *************** *** 1,5 **** package com.ubermq.jms.server.admin; ! import com.ubermq.jms.server.routing.*; import java.rmi.*; import java.util.*; --- 1,5 ---- package com.ubermq.jms.server.admin; ! import com.ubermq.jms.common.routing.*; import java.rmi.*; import java.util.*; Index: RemoteAdminProxy.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/admin/RemoteAdminProxy.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** RemoteAdminProxy.java 19 Dec 2002 22:36:22 -0000 1.2 --- RemoteAdminProxy.java 21 Jan 2004 02:05:25 -0000 1.3 *************** *** 1,5 **** package com.ubermq.jms.server.admin; ! import com.ubermq.jms.server.routing.*; import java.rmi.*; import java.rmi.server.*; --- 1,5 ---- package com.ubermq.jms.server.admin; ! import com.ubermq.jms.common.routing.*; import java.rmi.*; import java.rmi.server.*; |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client/unicast Modified Files: FailoverConnectionDescriptor.java PipeConnection.java UnicastConnection.java SSLClientSession.java Log Message: bug fixes related to read/write initialization race conditions Index: FailoverConnectionDescriptor.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast/FailoverConnectionDescriptor.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** FailoverConnectionDescriptor.java 12 Jan 2004 19:24:25 -0000 1.6 --- FailoverConnectionDescriptor.java 21 Jan 2004 02:05:24 -0000 1.7 *************** *** 95,99 **** } ! com.ubermq.Utility.getLogger().debug("parsed failover URL as " + connections); // return it. if there is only one URL, return a fail-fast URL. --- 95,99 ---- } ! com.ubermq.util.Utility.getLogger().debug("parsed failover URL as " + connections); // return it. if there is only one URL, return a fail-fast URL. *************** *** 132,136 **** // choose a descriptor InternetConnectionDescriptor icd = ((InternetConnectionDescriptor)selector.choose(connections)); ! com.ubermq.Utility.getLogger().debug("failover choosing " + icd); return icd.getAddress(); --- 132,136 ---- // choose a descriptor InternetConnectionDescriptor icd = ((InternetConnectionDescriptor)selector.choose(connections)); ! com.ubermq.util.Utility.getLogger().debug("failover choosing " + icd); return icd.getAddress(); Index: PipeConnection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast/PipeConnection.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** PipeConnection.java 24 Jan 2003 15:34:47 -0000 1.8 --- PipeConnection.java 21 Jan 2004 02:05:24 -0000 1.9 *************** *** 1,13 **** package com.ubermq.jms.client.unicast; import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.server.datagram.*; ! ! import com.ubermq.jms.client.proc.ClientProc; ! import com.ubermq.jms.server.MessageServer; ! import com.ubermq.jms.server.datagram.impl.DatagramFactory; ! import com.ubermq.kernel.IDatagramFactory; ! import java.io.IOException; /** --- 1,11 ---- package com.ubermq.jms.client.unicast; + import java.io.*; + import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.client.proc.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.datagram.impl.*; /** Index: UnicastConnection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast/UnicastConnection.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** UnicastConnection.java 24 Jan 2003 15:34:48 -0000 1.2 --- UnicastConnection.java 21 Jan 2004 02:05:24 -0000 1.3 *************** *** 1,12 **** package com.ubermq.jms.client.unicast; import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; import com.ubermq.jms.client.proc.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.datagram.impl.*; import com.ubermq.kernel.*; - import java.io.*; /** --- 1,14 ---- package com.ubermq.jms.client.unicast; + import java.io.*; + import EDU.oswego.cs.dl.util.concurrent.*; + import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; import com.ubermq.jms.client.proc.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.datagram.impl.*; import com.ubermq.kernel.*; /** Index: SSLClientSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast/SSLClientSession.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** SSLClientSession.java 24 Jan 2003 15:34:48 -0000 1.1 --- SSLClientSession.java 21 Jan 2004 02:05:24 -0000 1.2 *************** *** 1,12 **** package com.ubermq.jms.client.unicast; - import com.ubermq.jms.client.*; - import com.ubermq.jms.client.impl.*; - import com.ubermq.jms.server.ssl.*; - import com.ubermq.kernel.*; import java.net.*; import java.nio.channels.*; import javax.net.ssl.*; /** * This client session extends the abstract base client session to make socket --- 1,14 ---- package com.ubermq.jms.client.unicast; import java.net.*; import java.nio.channels.*; + import javax.net.ssl.*; + import com.ubermq.jms.client.*; + import com.ubermq.jms.client.impl.*; + import com.ubermq.jms.common.ssl.*; + import com.ubermq.kernel.*; + /** * This client session extends the abstract base client session to make socket |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/cluster In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/cluster Modified Files: ClusterTopicConnection.java ClusterPropagation.java ClusterTopicConnectionFactory.java Log Message: bug fixes related to read/write initialization race conditions Index: ClusterTopicConnection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/cluster/ClusterTopicConnection.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** ClusterTopicConnection.java 31 Jan 2003 23:21:16 -0000 1.12 --- ClusterTopicConnection.java 21 Jan 2004 02:05:24 -0000 1.13 *************** *** 4,9 **** import com.ubermq.jms.client.unicast.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.datagram.control.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; --- 4,9 ---- import com.ubermq.jms.client.unicast.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.datagram.control.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; Index: ClusterPropagation.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/cluster/ClusterPropagation.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** ClusterPropagation.java 12 Jan 2004 19:24:25 -0000 1.12 --- ClusterPropagation.java 21 Jan 2004 02:05:24 -0000 1.13 *************** *** 41,45 **** catch(Exception x) { ! com.ubermq.Utility.getLogger().error("", x);; } } --- 41,45 ---- catch(Exception x) { ! com.ubermq.util.Utility.getLogger().error("", x);; } } *************** *** 69,73 **** } catch (JMSException e) { ! com.ubermq.Utility.getLogger().error("", e);; } } --- 69,73 ---- } catch (JMSException e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } } Index: ClusterTopicConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/cluster/ClusterTopicConnectionFactory.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** ClusterTopicConnectionFactory.java 31 Jan 2003 23:21:16 -0000 1.8 --- ClusterTopicConnectionFactory.java 21 Jan 2004 02:05:24 -0000 1.9 *************** *** 22,26 **** public ClusterTopicConnectionFactory(String url) { ! this.icd = new SimpleInternetConnectionDescriptor(url, com.ubermq.jms.server.MessageServer.DEFAULT_PORT); } --- 22,26 ---- public ClusterTopicConnectionFactory(String url) { ! this.icd = new SimpleInternetConnectionDescriptor(url, com.ubermq.jms.common.MessageConstants.DEFAULT_PORT); } |
From: <ji...@us...> - 2004-01-21 02:05:57
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/journal Modified Files: DurableSubscriptionProxy.java ISettingsRepository.java IJournal.java Log Message: bug fixes related to read/write initialization race conditions Index: DurableSubscriptionProxy.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/DurableSubscriptionProxy.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** DurableSubscriptionProxy.java 12 Jan 2004 19:24:25 -0000 1.24 --- DurableSubscriptionProxy.java 21 Jan 2004 02:05:23 -0000 1.25 *************** *** 4,11 **** import com.ubermq.jms.client.DeliveryMode; import com.ubermq.jms.server.*; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.jms.server.journal.impl.*; ! import com.ubermq.jms.server.routing.*; ! import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; --- 4,11 ---- import com.ubermq.jms.client.DeliveryMode; import com.ubermq.jms.server.*; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.jms.server.journal.impl.*; ! import com.ubermq.jms.common.routing.*; ! import com.ubermq.jms.common.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; *************** *** 179,183 **** catch(Throwable ise) { ! com.ubermq.Utility.getLogger().error("", ise); } } --- 179,183 ---- catch(Throwable ise) { ! com.ubermq.util.Utility.getLogger().error("", ise); } } *************** *** 236,240 **** catch (IOException e) { ! com.ubermq.Utility.getLogger().error("", e); this.displayName = name; } --- 236,240 ---- catch (IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); this.displayName = name; } *************** *** 343,347 **** private void ack(MessageId id) { ! com.ubermq.Utility.getLogger().debug("ack " + id ); journal.ack(id); } --- 343,347 ---- private void ack(MessageId id) { ! com.ubermq.util.Utility.getLogger().debug("ack " + id ); journal.ack(id); } *************** *** 353,357 **** public void process(IConnectionInfo conn, IDatagram read) { ! com.ubermq.Utility.getLogger().debug("recovering " + read); if (read instanceof IMessageDatagram) --- 353,357 ---- public void process(IConnectionInfo conn, IDatagram read) { ! com.ubermq.util.Utility.getLogger().debug("recovering " + read); if (read instanceof IMessageDatagram) *************** *** 403,407 **** else { ! com.ubermq.Utility.getLogger().debug("journaled message in disconnected mode"); } } --- 403,407 ---- else { ! com.ubermq.util.Utility.getLogger().debug("journaled message in disconnected mode"); } } Index: ISettingsRepository.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/ISettingsRepository.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** ISettingsRepository.java 20 Sep 2002 20:59:32 -0000 1.1 --- ISettingsRepository.java 21 Jan 2004 02:05:23 -0000 1.2 *************** *** 1,6 **** package com.ubermq.jms.server.journal; - import java.nio.*; - /** * An interface to a journaling mechanism --- 1,4 ---- Index: IJournal.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/IJournal.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** IJournal.java 18 Oct 2002 16:59:30 -0000 1.6 --- IJournal.java 21 Jan 2004 02:05:23 -0000 1.7 *************** *** 1,5 **** package com.ubermq.jms.server.journal; ! import com.ubermq.jms.server.datagram.MessageId; import com.ubermq.kernel.DatagramSink; import com.ubermq.kernel.IMessageProcessor; --- 1,5 ---- package com.ubermq.jms.server.journal; ! import com.ubermq.jms.common.datagram.MessageId; import com.ubermq.kernel.DatagramSink; import com.ubermq.kernel.IMessageProcessor; |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/journal/impl Modified Files: FailoverArbiter.java AbstractMultipleArbiter.java BinarySettingsRepository.java SimpleJournal.java XMLSettingsRepository.java Added Files: ArbiterTestCase.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: ArbiterTestCase.java --- /* * Contents copyright 2002-03 Rhombus Technologies. */ package com.ubermq.jms.server.journal.impl; import java.io.*; import junit.framework.*; import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.test.DurableTestCase.*; import com.ubermq.jms.common.datagram.impl.*; import com.ubermq.jms.server.journal.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; /** * * @author jp */ public class ArbiterTestCase extends TestCase { /** * @param arg0 */ public ArbiterTestCase(String arg0) { super(arg0); } public void testArbiters() throws Exception { IDatagram ad = new AckDatagram(); IOverflowHandler h = new DropIncoming(); SynchronizedInt one = new SynchronizedInt(0), two = new SynchronizedInt(0), three = new SynchronizedInt(0); DatagramSink nodeOne = new IncrementOutputNode(one), nodeTwo = new IncrementOutputNode(two), nodeThree = new IncrementOutputNode(three); // test singelton DurableConnectionArbiter a = new SingletonArbiter(); a.connect(nodeOne); a.output(ad, h); Assert.assertEquals(one.get(), 1); a.disconnect(nodeOne); Assert.assertTrue(!a.isOpen()); try { a.output(ad, h); Assert.assertTrue(false); } catch(IOException iox) { Assert.assertTrue(true); } Assert.assertEquals(one.get(), 1); a.connect(nodeOne); a.output(ad, h); Assert.assertEquals(one.get(), 2); a.connect(nodeTwo); a.output(ad, h); Assert.assertEquals(one.get() + two.get(), 3); int oldOne = one.get(); a.disconnect(nodeOne); a.output(ad, h); Assert.assertEquals(oldOne, one.get()); Assert.assertEquals(one.get() + two.get(), 4); // ok test round robin one.set(0); two.set(0); three.set(0); a = new RoundRobinArbiter(); a.connect(nodeOne); a.connect(nodeTwo); a.connect(nodeThree); final int N = 21; for (int i = 0; i < N; i++) { a.output(ad, h); } Assert.assertTrue(one.get() == two.get() && two.get() == three.get() && three.get() == N/3); one.set(0); two.set(0); three.set(0); a.disconnect(nodeThree); a.output(ad,h); a.output(ad,h); Assert.assertEquals(one.get(), 1); Assert.assertEquals(two.get(), 1); // test failover one.set(0); two.set(0); three.set(0); a = new FailoverArbiter(); Assert.assertTrue(!a.isOpen()); a.connect(nodeOne); a.connect(nodeTwo); Assert.assertTrue(a.isOpen()); a.output(ad, h); a.output(ad, h); a.output(ad, h); Assert.assertEquals(one.get(), 3); Assert.assertEquals(two.get(), 0); ((IncrementOutputNode)nodeOne).setOpen(false); Assert.assertTrue(a.isOpen()); for (int i = 0; i < 3; i++) { try { a.output(ad, h); } catch(IOException iox) { Assert.assertTrue(a.isOpen()); a.output(ad, h); } } Assert.assertTrue(a.isOpen()); Assert.assertEquals(one.get(), 3); Assert.assertEquals(two.get(), 3); ((IncrementOutputNode)nodeOne).setOpen(true); } } Index: FailoverArbiter.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl/FailoverArbiter.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** FailoverArbiter.java 20 Dec 2002 14:54:30 -0000 1.4 --- FailoverArbiter.java 21 Jan 2004 02:05:23 -0000 1.5 *************** *** 2,6 **** import com.ubermq.jms.server.journal.*; ! import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import java.io.*; --- 2,6 ---- import com.ubermq.jms.server.journal.*; ! import com.ubermq.jms.common.routing.impl.*; import com.ubermq.kernel.*; import java.io.*; Index: AbstractMultipleArbiter.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl/AbstractMultipleArbiter.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** AbstractMultipleArbiter.java 20 Dec 2002 14:54:30 -0000 1.7 --- AbstractMultipleArbiter.java 21 Jan 2004 02:05:23 -0000 1.8 *************** *** 2,6 **** import com.ubermq.jms.server.journal.*; ! import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import java.io.*; --- 2,6 ---- import com.ubermq.jms.server.journal.*; ! import com.ubermq.jms.common.routing.impl.*; import com.ubermq.kernel.*; import java.io.*; Index: BinarySettingsRepository.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl/BinarySettingsRepository.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** BinarySettingsRepository.java 12 Jan 2004 19:24:25 -0000 1.3 --- BinarySettingsRepository.java 21 Jan 2004 02:05:23 -0000 1.4 *************** *** 54,58 **** catch (java.io.IOException e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 54,58 ---- catch (java.io.IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } *************** *** 72,76 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 72,76 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } Index: SimpleJournal.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl/SimpleJournal.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** SimpleJournal.java 12 Jan 2004 19:24:25 -0000 1.7 --- SimpleJournal.java 21 Jan 2004 02:05:23 -0000 1.8 *************** *** 1,5 **** package com.ubermq.jms.server.journal.impl; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; import java.io.*; --- 1,5 ---- package com.ubermq.jms.server.journal.impl; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; import java.io.*; *************** *** 101,105 **** } catch (IOException e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 101,105 ---- } catch (IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } *************** *** 114,118 **** private synchronized int handleOverflow(IDatagram d, int position, IOverflowHandler h) { ! com.ubermq.Utility.getLogger().debug("handleOverflow enetered."); // backoff the last output and compact the buffer. --- 114,118 ---- private synchronized int handleOverflow(IDatagram d, int position, IOverflowHandler h) { ! com.ubermq.util.Utility.getLogger().debug("handleOverflow enetered."); // backoff the last output and compact the buffer. *************** *** 157,161 **** journalBuffer.limit(limit); ! com.ubermq.Utility.getLogger().debug("restored checkpoint to " + journalBuffer.position() + " to " + journalBuffer.limit()); } --- 157,161 ---- journalBuffer.limit(limit); ! com.ubermq.util.Utility.getLogger().debug("restored checkpoint to " + journalBuffer.position() + " to " + journalBuffer.limit()); } *************** *** 167,171 **** journalBuffer.position(limit); ! com.ubermq.Utility.getLogger().debug("restored write position to " + journalBuffer.position()); } --- 167,171 ---- journalBuffer.position(limit); ! com.ubermq.util.Utility.getLogger().debug("restored write position to " + journalBuffer.position()); } *************** *** 174,178 **** int writing = Math.min(offsetBuffer.get(LIMIT_INDEX), Math.max(offsetBuffer.get(POSITION_INDEX), checkpt)); ! com.ubermq.Utility.getLogger().debug("checkpointing " + writing + " requested = " + checkpt); offsetBuffer.put(POSITION_INDEX, writing); logfileBuffer.force(); --- 174,178 ---- int writing = Math.min(offsetBuffer.get(LIMIT_INDEX), Math.max(offsetBuffer.get(POSITION_INDEX), checkpt)); ! com.ubermq.util.Utility.getLogger().debug("checkpointing " + writing + " requested = " + checkpt); offsetBuffer.put(POSITION_INDEX, writing); logfileBuffer.force(); *************** *** 181,185 **** private synchronized void limit(int limit) { ! com.ubermq.Utility.getLogger().debug("limit is " + limit); offsetBuffer.put(LIMIT_INDEX, limit); logfileBuffer.force(); --- 181,185 ---- private synchronized void limit(int limit) { ! com.ubermq.util.Utility.getLogger().debug("limit is " + limit); offsetBuffer.put(LIMIT_INDEX, limit); logfileBuffer.force(); *************** *** 188,192 **** private synchronized void compact(int delta) { ! com.ubermq.Utility.getLogger().debug("compacting by " + delta); offsetBuffer.put(POSITION_INDEX, offsetBuffer.get(POSITION_INDEX) + delta); offsetBuffer.put(LIMIT_INDEX, offsetBuffer.get(LIMIT_INDEX) + delta); --- 188,192 ---- private synchronized void compact(int delta) { ! com.ubermq.util.Utility.getLogger().debug("compacting by " + delta); offsetBuffer.put(POSITION_INDEX, offsetBuffer.get(POSITION_INDEX) + delta); offsetBuffer.put(LIMIT_INDEX, offsetBuffer.get(LIMIT_INDEX) + delta); *************** *** 199,203 **** public void ack(MessageId id) { ! com.ubermq.Utility.getLogger().debug("waiting for ack on: " + messageOffsets ); // we just got an ack for this message. --- 199,203 ---- public void ack(MessageId id) { ! com.ubermq.util.Utility.getLogger().debug("waiting for ack on: " + messageOffsets ); // we just got an ack for this message. Index: XMLSettingsRepository.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl/XMLSettingsRepository.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** XMLSettingsRepository.java 12 Jan 2004 19:24:25 -0000 1.3 --- XMLSettingsRepository.java 21 Jan 2004 02:05:23 -0000 1.4 *************** *** 55,59 **** catch (java.io.IOException e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 55,59 ---- catch (java.io.IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } *************** *** 70,78 **** } catch (IOException e) { ! com.ubermq.Utility.getLogger().error("", e); entries = new HashMap(); } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 70,78 ---- } catch (IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); entries = new HashMap(); } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } |
From: <ji...@us...> - 2004-01-21 02:05:56
|
Update of /cvsroot/ubermq/jms/src In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src Modified Files: log4j.properties Added Files: log4j-debug.properties Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: log4j-debug.properties --- ### direct log messages to stdout ### log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n ### direct messages to file hibernate.log ### #log4j.appender.file=org.apache.log4j.FileAppender #log4j.appender.file.File=hibernate.log #log4j.appender.file.layout=org.apache.log4j.PatternLayout #log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n ### set log levels - for more verbose logging change 'info' to 'debug' ## log4j.rootLogger=warn, stdout log4j.logger.net.sf.hibernate=debug log4j.logger.com.rhombus=debug log4j.logger.com.ubermq=debug ### enable the following line if you want to track down connection ### ### leakages when using DriverManagerConnectionProvider ### #log4j.logger.net.sf.hibernate.connection.DriverManagerConnectionProvider=trace ### log JDBC bind parameters ### log4j.logger.net.sf.hibernate.type=info ### log prepared statement cache activity ### log4j.logger.net.sf.hibernate.ps.PreparedStatementCache=info Index: log4j.properties =================================================================== RCS file: /cvsroot/ubermq/jms/src/log4j.properties,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** log4j.properties 15 Jan 2004 02:01:40 -0000 1.1 --- log4j.properties 21 Jan 2004 02:05:23 -0000 1.2 *************** *** 17,21 **** log4j.logger.net.sf.hibernate=error log4j.logger.com.rhombus=info ! log4j.logger.com.ubermq=info ### enable the following line if you want to track down connection ### --- 17,21 ---- log4j.logger.net.sf.hibernate=error log4j.logger.com.rhombus=info ! log4j.logger.com.ubermq=debug ### enable the following line if you want to track down connection ### |
From: <ji...@us...> - 2004-01-21 02:05:55
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/kernel/overflow Modified Files: ExponentialBackoff.java KillReceiver.java Log Message: bug fixes related to read/write initialization race conditions Index: ExponentialBackoff.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/ExponentialBackoff.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** ExponentialBackoff.java 12 Jan 2004 19:24:26 -0000 1.8 --- ExponentialBackoff.java 21 Jan 2004 02:05:22 -0000 1.9 *************** *** 55,59 **** { try { ! com.ubermq.Utility.getLogger().debug("overflow condition - waiting " + timeout + " ms"); Thread.sleep(timeout); } catch(InterruptedException ie) { --- 55,59 ---- { try { ! com.ubermq.util.Utility.getLogger().debug("overflow condition - waiting " + timeout + " ms"); Thread.sleep(timeout); } catch(InterruptedException ie) { Index: KillReceiver.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/KillReceiver.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** KillReceiver.java 16 Jun 2003 00:01:53 -0000 1.7 --- KillReceiver.java 21 Jan 2004 02:05:22 -0000 1.8 *************** *** 1,6 **** package com.ubermq.kernel.overflow; - import com.ubermq.Utility; import com.ubermq.kernel.*; /** --- 1,6 ---- package com.ubermq.kernel.overflow; import com.ubermq.kernel.*; + import com.ubermq.util.*; /** |
From: <ji...@us...> - 2004-01-21 02:05:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/ssl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/ssl Modified Files: SSLProtocol.java Removed Files: IONormalizer.java Log Message: bug fixes related to read/write initialization race conditions Index: SSLProtocol.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/ssl/SSLProtocol.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** SSLProtocol.java 12 Jan 2004 19:24:26 -0000 1.7 --- SSLProtocol.java 21 Jan 2004 02:05:27 -0000 1.8 *************** *** 1,4 **** --- 1,6 ---- package com.ubermq.jms.server.ssl; import com.ubermq.jms.client.unicast.*; + import com.ubermq.jms.common.*; + import com.ubermq.jms.common.ssl.*; import com.ubermq.jms.server.*; import com.ubermq.kernel.*; *************** *** 16,23 **** implements MessageServer.Protocol { - public static final int DEFAULT_PORT = 3994; public static int getProtocolPort() { ! return Integer.valueOf(Configurator.getProperty(ServerConfig.SSL_PORT, String.valueOf(DEFAULT_PORT))).intValue(); } --- 18,24 ---- implements MessageServer.Protocol { public static int getProtocolPort() { ! return Integer.valueOf(Configurator.getProperty(ServerConfig.SSL_PORT, String.valueOf(MessageConstants.DEFAULT_SSL_PORT))).intValue(); } *************** *** 114,118 **** } catch (IOException e) { ! com.ubermq.Utility.getLogger().error("", e); return; } --- 115,119 ---- } catch (IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); return; } *************** *** 127,131 **** public String toString() { ! return (port == DEFAULT_PORT) ? "Secure UberMQ" : ("Secure UberMQ (" + port + ")"); } --- 128,132 ---- public String toString() { ! return (port == MessageConstants.DEFAULT_SSL_PORT) ? "Secure UberMQ" : ("Secure UberMQ (" + port + ")"); } *************** *** 141,145 **** try { ! if (port == DEFAULT_PORT) return URI.create("ubermqs://" + InetAddress.getLocalHost()); else --- 142,146 ---- try { ! if (port == MessageConstants.DEFAULT_SSL_PORT) return URI.create("ubermqs://" + InetAddress.getLocalHost()); else --- IONormalizer.java DELETED --- |
From: <ji...@us...> - 2004-01-21 02:05:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq Removed Files: Utility.java Log Message: bug fixes related to read/write initialization race conditions --- Utility.java DELETED --- |
From: <ji...@us...> - 2004-01-21 02:05:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/ui/viewer In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/ui/viewer Modified Files: MessageController.java Log Message: bug fixes related to read/write initialization race conditions Index: MessageController.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/ui/viewer/MessageController.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** MessageController.java 12 Jan 2004 19:24:26 -0000 1.8 --- MessageController.java 21 Jan 2004 02:05:27 -0000 1.9 *************** *** 203,207 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("",e); } --- 203,207 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("",e); } |
From: <ji...@us...> - 2004-01-21 02:05:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/common/ssl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/common/ssl Added Files: IONormalizer.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: IONormalizer.java --- package com.ubermq.jms.common.ssl; import java.net.*; import java.nio.channels.*; import java.nio.*; import java.io.*; /** * For now, it is necessary to map SSL operations onto NIO via pipes. * This is undesirable for a lot of reasons, but there is really not * much we can do here until Sun comes up with a way to do this * from the JDK.<P> * * This could also be useful perhaps to allow HTTP or other protocol * tunneling that is implemented in the JDK. <P> * * @since 2.1 */ public class IONormalizer { private static final int BUFFER_SIZE = 4096; /** * Normalizes I/O from java.io to/from java.nio paradigms. * This method will set up the necessary threads and resources * to read and write data using a pipe on one end and a socket * on the other. * * @param fromsocket a Pipe representing traffic * going to the socket * * @param tosocket a Pipe representing traffic * coming from the socket * * @param s a Socket, which is two-way * by definition. * * @throws IOException if a failure occurs */ public static void normalize(final Pipe fromsocket, final Pipe tosocket, final Socket s) throws IOException { // get streams final InputStream in = s.getInputStream(); final OutputStream out = s.getOutputStream(); // from the pipes to the socket. we need to be // able to interrupt this thread so that it will shutdown // the pipes when necessary. final Thread writer = new Thread(new Runnable() { public void run() { try { ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE); while(!Thread.interrupted()) { int n = tosocket.source().read(buf); if (n == -1) break; else { out.write(buf.array(), 0, n); buf.clear(); } } } catch (IOException e) { ; // do nothing } } }, "IONormalizer to-socket"); // now spawn a thread to handle bringing bytes // from the socket Thread reader = new Thread(new Runnable() { public void run() { try { byte[] buf = new byte[BUFFER_SIZE]; while(!Thread.interrupted()) { int n = in.read(buf); if (n == -1) { break; } else { int len = 0; while(len < n) { len += fromsocket.sink().write(ByteBuffer.wrap(buf, len, n-len)); } } } } catch (SocketException e) { // don;t print here. it's fine that the cxn was // reset. } catch (IOException e) { com.ubermq.util.Utility.getLogger().error("", e); } finally { try { writer.interrupt(); tosocket.sink().close(); writer.join(); } catch (Exception e) { com.ubermq.util.Utility.getLogger().error("", e); } // close everything try { fromsocket.sink().close(); } catch (IOException e) { com.ubermq.util.Utility.getLogger().error("", e); } try { s.close(); } catch (IOException e) { com.ubermq.util.Utility.getLogger().error("", e); } } } }, "IONormalizer from-socket"); // start em reader.setDaemon(true); writer.setDaemon(true); reader.start(); writer.start(); } } |
From: <ji...@us...> - 2004-01-21 02:05:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/common/overflow In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/common/overflow Added Files: TTLOverflowHandler.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: TTLOverflowHandler.java --- package com.ubermq.jms.common.overflow; import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; import javax.jms.*; /** * An extension to the exponential backoff overflow handler * that potentially drops messages if we would wait for space longer than * their TTL.<P> * * This handler also looks at the delivery mode property and assumes * negligible TTL for non persistent messages.<P> */ public class TTLOverflowHandler extends ExponentialBackoff { public TTLOverflowHandler() {super();} public TTLOverflowHandler(long initial, int factor, long max, boolean shouldFail) { super(initial, factor, max, shouldFail); } public IOverflowHandler getRetryHandler() { return new TTLOverflowHandler(Math.min(maximumTimeout, timeout * factor), factor, maximumTimeout, shouldFailIfMaximumReached); } /** * If this is a message datagram, and the timeout we're about to wait is * greater or equal to the TTL, drop the message. If not, we delegate * to our superclass. */ public int overflow(IDatagram d) { if (d instanceof IMessageDatagram) { int ttl = ((Integer)((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_TTL)).intValue(); int dmode = ((Integer)((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE)).intValue(); if (dmode == DeliveryMode.NON_PERSISTENT || (ttl > 0 && this.timeout >= ttl)) return IOverflowHandler.ACTION_IGNORE; } return super.overflow(d); } } |
From: <ji...@us...> - 2004-01-21 02:05:30
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/common In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/common Added Files: MessageConstants.java CommonConfig.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: MessageConstants.java --- /* * Contents copyright 2002-03 Rhombus Technologies. */ package com.ubermq.jms.common; /** * A repository for commonly used constants.<P> */ public final class MessageConstants { public static final int DEFAULT_PORT = 3999; public static final int DEFAULT_SSL_PORT = 3994; } --- NEW FILE: CommonConfig.java --- /* * Contents copyright 2002-03 Rhombus Technologies. */ package com.ubermq.jms.common; /** * Common configuration options for all JMS abstractions.<P> * @since 2.4 */ public class CommonConfig { private CommonConfig() { super(); } /** * the size of the buffer allocated for custom properties. this can be small * if you are not actively using properties, but make it large if you * are putting objects,etc. in there */ public static final String DGP_MAXIMUM_PROPERTY_LENGTH = "server.dgram.maximumprops"; } |
From: <ji...@us...> - 2004-01-21 02:05:30
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/proc Modified Files: DatagramProc.java EchoMessageProcessor.java Removed Files: TTLOverflowHandler.java Log Message: bug fixes related to read/write initialization race conditions Index: DatagramProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/DatagramProc.java,v retrieving revision 1.40 retrieving revision 1.41 diff -C2 -d -r1.40 -r1.41 *** DatagramProc.java 12 Jan 2004 19:24:25 -0000 1.40 --- DatagramProc.java 21 Jan 2004 02:05:27 -0000 1.41 *************** *** 1,18 **** package com.ubermq.jms.server.proc; import com.ubermq.jms.client.impl.*; import com.ubermq.jms.server.*; import com.ubermq.jms.server.admin.*; - import com.ubermq.jms.server.datagram.*; - import com.ubermq.jms.server.datagram.control.*; import com.ubermq.jms.server.journal.*; import com.ubermq.jms.server.journal.impl.*; - import com.ubermq.jms.server.routing.*; - import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; - import java.io.*; - import java.lang.reflect.*; - import java.nio.*; - import java.util.*; /** --- 1,19 ---- package com.ubermq.jms.server.proc; + import java.io.*; + import java.lang.reflect.*; + import java.util.*; + import com.ubermq.jms.client.impl.*; + import com.ubermq.jms.common.datagram.*; + import com.ubermq.jms.common.datagram.control.*; + import com.ubermq.jms.common.overflow.*; + import com.ubermq.jms.common.routing.*; + import com.ubermq.jms.common.routing.impl.*; import com.ubermq.jms.server.*; import com.ubermq.jms.server.admin.*; import com.ubermq.jms.server.journal.*; import com.ubermq.jms.server.journal.impl.*; import com.ubermq.kernel.*; /** *************** *** 24,27 **** --- 25,30 ---- implements IMessageProcessor, MessageServerAdmin, IRouterStatistics { + private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(DatagramProc.class); + // the standard overflow handler for outgoing messages private static final long INITIAL_TIMEOUT = Long.valueOf(Configurator.getProperty(ServerConfig.DGP_INITIAL_TIMEOUT, "50")).longValue(); *************** *** 188,192 **** private void ack(IConnectionInfo conn, IAckDatagram ad) { ! com.ubermq.Utility.getLogger().debug("ack! " + ad.toString()); // route this using the ACK router. --- 191,195 ---- private void ack(IConnectionInfo conn, IAckDatagram ad) { ! com.ubermq.util.Utility.getLogger().debug("ack! " + ad.toString()); // route this using the ACK router. *************** *** 201,205 **** private void control(IConnectionInfo conn, IControlDatagram cd) { ! com.ubermq.Utility.getLogger().debug("control! " + cd.toString()); boolean ack = false; --- 204,208 ---- private void control(IConnectionInfo conn, IControlDatagram cd) { ! com.ubermq.util.Utility.getLogger().debug("control! " + cd.toString()); boolean ack = false; *************** *** 322,330 **** catch(Exception x) { ! com.ubermq.Utility.getLogger().error("", x); ack = false; } finally { // send the ACK of the command back to the caller // so the caller can proceed --- 325,335 ---- catch(Exception x) { ! log.error("Failed sending acknowledgement", x); ack = false; } finally { + log.debug("Sending acknowledgement " + ack + " to " + conn + " for RPC"); + // send the ACK of the command back to the caller // so the caller can proceed *************** *** 333,337 **** // show the router status ! com.ubermq.Utility.getLogger().debug(router.toString()); } --- 338,342 ---- // show the router status ! com.ubermq.util.Utility.getLogger().debug(router.toString()); } *************** *** 396,403 **** --- 401,411 ---- // output the ACK or NACK, as appropriate. + log.debug("sending " + conn + " message ID " + md.getMessageId() + " ack=" + !ad.isNegativeAck()); conn.output(ad, overflow); } catch (IOException e) { + log.debug("could not send message acknowledgement", e); + // connection failed. remove(conn); *************** *** 414,417 **** --- 422,427 ---- catch (IOException e) { + log.debug("could not send RPC ack due to I/O problem", e); + // looks like the caller went down. // blow him away. *************** *** 453,457 **** // this is a severe error if we cannot open the durable logfile // we should report it to the user. ! com.ubermq.Utility.getLogger().fatal(e.getMessage()); return null; } --- 463,467 ---- // this is a severe error if we cannot open the durable logfile // we should report it to the user. ! com.ubermq.util.Utility.getLogger().fatal(e.getMessage()); return null; } *************** *** 516,520 **** private void restoreDurableSubscriber(DurableSubscriptionProxy p) { ! com.ubermq.Utility.getLogger().info("restoring durable subscriber " + p.getName()); // subscribe the proxy to the topics. --- 526,530 ---- private void restoreDurableSubscriber(DurableSubscriptionProxy p) { ! com.ubermq.util.Utility.getLogger().info("restoring durable subscriber " + p.getName()); // subscribe the proxy to the topics. Index: EchoMessageProcessor.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/EchoMessageProcessor.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** EchoMessageProcessor.java 12 Jan 2004 19:24:25 -0000 1.6 --- EchoMessageProcessor.java 21 Jan 2004 02:05:27 -0000 1.7 *************** *** 21,25 **** public void accept(IConnectionInfo ci) { ! com.ubermq.Utility.getLogger().info("accepted " + ci); allConnections.add(ci); } --- 21,25 ---- public void accept(IConnectionInfo ci) { ! com.ubermq.util.Utility.getLogger().info("accepted " + ci); allConnections.add(ci); } *************** *** 40,44 **** if (!conn.equals(sender)) { ! com.ubermq.Utility.getLogger().info("outputting " + d + " to " + conn); try { --- 40,44 ---- if (!conn.equals(sender)) { ! com.ubermq.util.Utility.getLogger().info("outputting " + d + " to " + conn); try { *************** *** 47,51 **** catch (java.io.IOException e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 47,51 ---- catch (java.io.IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } --- TTLOverflowHandler.java DELETED --- |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client/test Modified Files: DurableTestCase.java DurableTest.java RegressionTestCase.java StatisticsTest.java Removed Files: Oldclient.java jmstest.java Log Message: bug fixes related to read/write initialization race conditions Index: DurableTestCase.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test/DurableTestCase.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** DurableTestCase.java 31 Jan 2003 23:21:14 -0000 1.8 --- DurableTestCase.java 21 Jan 2004 02:05:26 -0000 1.9 *************** *** 3,10 **** import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.server.*; ! import com.ubermq.jms.server.datagram.impl.*; ! import com.ubermq.jms.server.journal.*; ! import com.ubermq.jms.server.journal.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; --- 3,7 ---- import EDU.oswego.cs.dl.util.concurrent.*; import com.ubermq.jms.client.*; ! import com.ubermq.jms.common.datagram.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; *************** *** 155,267 **** } ! public void testArbiters() ! throws Exception ! { ! IDatagram ad = new AckDatagram(); ! IOverflowHandler h = new DropIncoming(); ! ! SynchronizedInt one = new SynchronizedInt(0), ! two = new SynchronizedInt(0), ! three = new SynchronizedInt(0); ! ! DatagramSink nodeOne = new IncrementOutputNode(one), ! nodeTwo = new IncrementOutputNode(two), ! nodeThree = new IncrementOutputNode(three); ! ! // test singelton ! DurableConnectionArbiter a = new SingletonArbiter(); ! a.connect(nodeOne); ! a.output(ad, h); ! Assert.assertEquals(one.get(), 1); ! ! a.disconnect(nodeOne); ! Assert.assertTrue(!a.isOpen()); ! try { ! a.output(ad, h); ! Assert.assertTrue(false); ! } catch(IOException iox) { ! Assert.assertTrue(true); ! } ! Assert.assertEquals(one.get(), 1); ! ! a.connect(nodeOne); ! a.output(ad, h); ! Assert.assertEquals(one.get(), 2); ! ! a.connect(nodeTwo); ! a.output(ad, h); ! Assert.assertEquals(one.get() + two.get(), 3); ! ! int oldOne = one.get(); ! a.disconnect(nodeOne); ! a.output(ad, h); ! Assert.assertEquals(oldOne, one.get()); ! Assert.assertEquals(one.get() + two.get(), 4); ! ! // ok test round robin ! one.set(0); ! two.set(0); ! three.set(0); ! ! a = new RoundRobinArbiter(); ! a.connect(nodeOne); ! a.connect(nodeTwo); ! a.connect(nodeThree); ! ! final int N = 21; ! for (int i = 0; i < N; i++) ! { ! a.output(ad, h); ! } ! ! Assert.assertTrue(one.get() == two.get() && ! two.get() == three.get() && ! three.get() == N/3); ! ! one.set(0); ! two.set(0); ! three.set(0); ! a.disconnect(nodeThree); ! a.output(ad,h); ! a.output(ad,h); ! Assert.assertEquals(one.get(), 1); ! Assert.assertEquals(two.get(), 1); ! ! // test failover ! one.set(0); ! two.set(0); ! three.set(0); ! ! a = new FailoverArbiter(); ! Assert.assertTrue(!a.isOpen()); ! a.connect(nodeOne); ! a.connect(nodeTwo); ! Assert.assertTrue(a.isOpen()); ! ! a.output(ad, h); ! a.output(ad, h); ! a.output(ad, h); ! Assert.assertEquals(one.get(), 3); ! Assert.assertEquals(two.get(), 0); ! ! ((IncrementOutputNode)nodeOne).setOpen(false); ! Assert.assertTrue(a.isOpen()); ! for (int i = 0; i < 3; i++) ! { ! try { ! a.output(ad, h); ! } catch(IOException iox) { ! Assert.assertTrue(a.isOpen()); ! a.output(ad, h); ! } ! } ! Assert.assertTrue(a.isOpen()); ! Assert.assertEquals(one.get(), 3); ! Assert.assertEquals(two.get(), 3); ! ! ((IncrementOutputNode)nodeOne).setOpen(true); ! } ! ! private static final class IncrementOutputNode implements DatagramSink { --- 152,156 ---- } ! public static final class IncrementOutputNode implements DatagramSink { *************** *** 269,273 **** private boolean open; ! private IncrementOutputNode(SynchronizedInt i) { this.i = i; --- 158,162 ---- private boolean open; ! public IncrementOutputNode(SynchronizedInt i) { this.i = i; *************** *** 294,298 **** } ! void setOpen(boolean f) {this.open = f;} } --- 183,190 ---- } ! public void setOpen(boolean f) ! { ! this.open = f; ! } } Index: DurableTest.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test/DurableTest.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** DurableTest.java 12 Jan 2004 19:24:24 -0000 1.6 --- DurableTest.java 21 Jan 2004 02:05:26 -0000 1.7 *************** *** 80,84 **** } } ! catch (Exception e) {com.ubermq.Utility.getLogger().error("", e);;} } --- 80,84 ---- } } ! catch (Exception e) {com.ubermq.util.Utility.getLogger().error("", e);;} } Index: RegressionTestCase.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test/RegressionTestCase.java,v retrieving revision 1.30 retrieving revision 1.31 diff -C2 -d -r1.30 -r1.31 *** RegressionTestCase.java 12 Jan 2004 19:24:24 -0000 1.30 --- RegressionTestCase.java 21 Jan 2004 02:05:26 -0000 1.31 *************** *** 1,21 **** package com.ubermq.jms.client.test; - import EDU.oswego.cs.dl.util.concurrent.*; - import com.ubermq.jms.client.*; - import com.ubermq.jms.client.impl.*; - import com.ubermq.jms.server.*; - import com.ubermq.jms.server.routing.impl.*; - import com.ubermq.kernel.*; - import com.ubermq.kernel.event.*; import java.util.*; import java.util.regex.*; - import javax.jms.*; - import junit.framework.*; - import sun.security.krb5.*; import javax.jms.Connection; import javax.jms.Session; import javax.jms.TopicSession; /** * A JUnit test case that exercises significant portions of the --- 1,21 ---- package com.ubermq.jms.client.test; import java.util.*; import java.util.regex.*; + import javax.jms.*; import javax.jms.Connection; import javax.jms.Session; import javax.jms.TopicSession; + import junit.framework.*; + import EDU.oswego.cs.dl.util.concurrent.*; + + import com.ubermq.jms.client.*; + import com.ubermq.jms.client.impl.*; + import com.ubermq.jms.common.routing.impl.*; + import com.ubermq.kernel.*; + import com.ubermq.kernel.event.*; + /** * A JUnit test case that exercises significant portions of the *************** *** 25,28 **** --- 25,31 ---- extends TestCase { + private static final org.apache.log4j.Logger log = + org.apache.log4j.Logger.getLogger(RegressionTestCase.class); + public static TestSuite suite() { return new TestSuite(RegressionTestCase.class); *************** *** 323,327 **** } catch (JMSException e) { ! com.ubermq.Utility.getLogger().error("", e);; } } --- 326,330 ---- } catch (JMSException e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } } *************** *** 503,530 **** /** - * Tests pipe connections and makes sure they work w/ themselves, and others. - */ - public void testPipes() - throws JMSException - { - Properties p = new Properties(); - p.put("server.port", "5003"); - - MessageServer ms = new MessageServer(p); - ms.addStandardProtocols(); - ms.run(); - - TopicConnectionFactory localFactory = new PipeConnectionFactory(ms), - remoteFactory = new URLTopicConnectionFactory(ms.getServiceUrl()); - - System.out.println("testPipes"); - sendAndReceive(localFactory, remoteFactory, "A", "A", null, 20, 20); - sendAndReceive(localFactory, "hello", "hello", null, 20, 20); - sendAndReceive(localFactory, "B", "B", "where ordinal != 2", 10, 9); - sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20); - - } - - /** * Tests SSL connections and makes sure they work w/ themselves, and others. */ --- 506,509 ---- *************** *** 542,576 **** sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20); } - - /** - * Tests Clustering. - */ - public void testClustering() - throws JMSException - { - Properties p = new Properties(); - p.put("server.port", "5001"); - - MessageServer ms = new MessageServer(p); - ms.addStandardProtocols(); - ms.run(); - - p.put("server.port", "5002"); - p.put("clustering.enable", "true"); - p.put("clustering.forward", "ubermq://localhost:5001"); - - MessageServer ms2 = new MessageServer(p); - ms2.addStandardProtocols(); - ms2.run(); - - TopicConnectionFactory localFactory = new URLTopicConnectionFactory("ubermq://localhost:5001"), - remoteFactory = new URLTopicConnectionFactory("ubermq://localhost:5002"); - - sendAndReceive(localFactory, remoteFactory, "A", "A", null, 20, 20); - sendAndReceive(localFactory, "hello", "hello", null, 20, 20); - sendAndReceive(localFactory, "B", "B", "where ordinal != 2", 10, 9); - sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20); - - } /** * Tests queue APIs. --- 521,524 ---- *************** *** 599,605 **** /** ! * Tests overflow handling for catatonic message listeners. */ ! public void testOverflows() throws Exception { --- 547,554 ---- /** ! * Tests overflow handling for catatonic message listeners. This is named ! * particuarly so that it does not get called in standard regression tests. */ ! public void xtestOverflows() throws Exception { *************** *** 637,641 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e);; } } --- 586,590 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } } *************** *** 664,668 **** catch (InterruptedException e) { ! com.ubermq.Utility.getLogger().error("", e);; } --- 613,617 ---- catch (InterruptedException e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } *************** *** 836,840 **** Assert.assertNotNull(m); m.acknowledge(); ! System.out.println("got message " + i + " ordinal was " + m.getIntProperty("ordinal")); } } --- 785,789 ---- Assert.assertNotNull(m); m.acknowledge(); ! log.debug("got message " + i + " ordinal was " + m.getIntProperty("ordinal")); } } Index: StatisticsTest.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test/StatisticsTest.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** StatisticsTest.java 12 Jan 2004 19:24:24 -0000 1.5 --- StatisticsTest.java 21 Jan 2004 02:05:26 -0000 1.6 *************** *** 4,11 **** import javax.jms.*; import junit.framework.*; - import com.ubermq.Utility; import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; import com.ubermq.jms.client.UnicastConnectionFactory; /** --- 4,11 ---- import javax.jms.*; import junit.framework.*; import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; import com.ubermq.jms.client.UnicastConnectionFactory; + import com.ubermq.util.*; /** *************** *** 43,47 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e);; } } --- 43,47 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } } *************** *** 143,147 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e);; } } --- 143,147 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } } --- Oldclient.java DELETED --- --- jmstest.java DELETED --- |
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client Modified Files: UnicastTopicConnectionFactory.java MulticastTopicConnectionFactory.java PipeTopicConnectionFactory.java IClientProcessor.java SSLConnectionFactory.java URLConnectionFactory.java IAcknowledgeHandler.java PipeConnectionFactory.java UnicastConnectionFactory.java Log Message: bug fixes related to read/write initialization race conditions Index: UnicastTopicConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/UnicastTopicConnectionFactory.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** UnicastTopicConnectionFactory.java 27 Mar 2003 15:19:40 -0000 1.9 --- UnicastTopicConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.10 *************** *** 31,35 **** * The method specification is ignored.<P> * - * @see com.ubermq.jms.server.MessageServer.DEFAULT_PORT * @see com.ubermq.jms.client.unicast.FailoverConnectionDescriptor * @param host a URL, or the name of the host to connect to. --- 31,34 ---- Index: MulticastTopicConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/MulticastTopicConnectionFactory.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** MulticastTopicConnectionFactory.java 12 Dec 2002 18:01:47 -0000 1.6 --- MulticastTopicConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.7 *************** *** 31,35 **** { this(uri.getHost(), ! (uri.getPort() > 0) ? uri.getPort() : com.ubermq.jms.server.MessageServer.DEFAULT_PORT); } --- 31,35 ---- { this(uri.getHost(), ! (uri.getPort() > 0) ? uri.getPort() : com.ubermq.jms.common.MessageConstants.DEFAULT_PORT); } Index: PipeTopicConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/PipeTopicConnectionFactory.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** PipeTopicConnectionFactory.java 12 Dec 2002 18:01:47 -0000 1.3 --- PipeTopicConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.4 *************** *** 1,7 **** package com.ubermq.jms.client; ! import com.ubermq.jms.client.unicast.PipeConnection; ! import com.ubermq.jms.server.MessageServer; ! import javax.jms.JMSException; /** --- 1,5 ---- package com.ubermq.jms.client; ! import com.ubermq.jms.client.unicast.*; /** *************** *** 21,30 **** public static final long serialVersionUID = 1L; ! /** ! * @deprecated ! */ ! public PipeTopicConnectionFactory(MessageServer s) { ! super(s); } } --- 19,25 ---- public static final long serialVersionUID = 1L; ! public PipeTopicConnectionFactory(PipeEndpoint pe) { ! super(pe); } } Index: IClientProcessor.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/IClientProcessor.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** IClientProcessor.java 19 Dec 2002 22:36:21 -0000 1.10 --- IClientProcessor.java 21 Jan 2004 02:05:26 -0000 1.11 *************** *** 1,5 **** package com.ubermq.jms.client; ! import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; import java.io.*; --- 1,5 ---- package com.ubermq.jms.client; ! import com.ubermq.jms.common.datagram.*; import com.ubermq.kernel.*; import java.io.*; Index: SSLConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/SSLConnectionFactory.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** SSLConnectionFactory.java 24 Jan 2003 15:34:43 -0000 1.1 --- SSLConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.2 *************** *** 3,7 **** import com.ubermq.jms.client.impl.*; import com.ubermq.jms.client.unicast.*; ! import com.ubermq.jms.server.datagram.impl.*; import java.net.*; import javax.jms.*; --- 3,7 ---- import com.ubermq.jms.client.impl.*; import com.ubermq.jms.client.unicast.*; ! import com.ubermq.jms.common.datagram.impl.*; import java.net.*; import javax.jms.*; *************** *** 26,30 **** * The method specification is ignored.<P> * - * @see com.ubermq.jms.server.MessageServer.DEFAULT_PORT * @see com.ubermq.jms.client.unicast.FailoverConnectionDescriptor * @param host a URL, or the name of the host to connect to. --- 26,29 ---- *************** *** 32,41 **** public SSLConnectionFactory(String url) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(url, com.ubermq.jms.server.ssl.SSLProtocol.DEFAULT_PORT); } public SSLConnectionFactory(URI uri) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(uri, com.ubermq.jms.server.ssl.SSLProtocol.DEFAULT_PORT); } --- 31,40 ---- public SSLConnectionFactory(String url) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(url, com.ubermq.jms.common.MessageConstants.DEFAULT_SSL_PORT); } public SSLConnectionFactory(URI uri) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(uri, com.ubermq.jms.common.MessageConstants.DEFAULT_SSL_PORT); } Index: URLConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/URLConnectionFactory.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** URLConnectionFactory.java 12 Jan 2004 19:24:24 -0000 1.3 --- URLConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.4 *************** *** 2,5 **** --- 2,7 ---- import com.ubermq.*; + import com.ubermq.util.*; + import java.io.*; import java.lang.reflect.*; *************** *** 97,101 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e);; } --- 99,103 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e);; } *************** *** 201,205 **** } catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e);; // this shouldn't happen - we should have verified --- 203,207 ---- } catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e);; // this shouldn't happen - we should have verified Index: IAcknowledgeHandler.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/IAcknowledgeHandler.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IAcknowledgeHandler.java 30 Jan 2003 14:42:13 -0000 1.4 --- IAcknowledgeHandler.java 21 Jan 2004 02:05:26 -0000 1.5 *************** *** 1,5 **** package com.ubermq.jms.client; ! import com.ubermq.jms.server.datagram.*; import java.io.*; --- 1,5 ---- package com.ubermq.jms.client; ! import com.ubermq.jms.common.datagram.*; import java.io.*; Index: PipeConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/PipeConnectionFactory.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** PipeConnectionFactory.java 12 Jan 2004 19:24:24 -0000 1.6 --- PipeConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.7 *************** *** 30,34 **** catch(Exception x) { ! com.ubermq.Utility.getLogger().error("", x);; throw new JMSException(x.getMessage()); } --- 30,34 ---- catch(Exception x) { ! com.ubermq.util.Utility.getLogger().error("", x);; throw new JMSException(x.getMessage()); } Index: UnicastConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/UnicastConnectionFactory.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** UnicastConnectionFactory.java 24 Jan 2003 15:34:43 -0000 1.3 --- UnicastConnectionFactory.java 21 Jan 2004 02:05:26 -0000 1.4 *************** *** 28,32 **** * The method specification is ignored.<P> * - * @see com.ubermq.jms.server.MessageServer.DEFAULT_PORT * @see com.ubermq.jms.client.unicast.FailoverConnectionDescriptor * @param host a URL, or the name of the host to connect to. --- 28,31 ---- *************** *** 34,43 **** public UnicastConnectionFactory(String url) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(url, com.ubermq.jms.server.MessageServer.DEFAULT_PORT); } public UnicastConnectionFactory(URI uri) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(uri, com.ubermq.jms.server.MessageServer.DEFAULT_PORT); } --- 33,42 ---- public UnicastConnectionFactory(String url) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(url, com.ubermq.jms.common.MessageConstants.DEFAULT_PORT); } public UnicastConnectionFactory(URI uri) { ! this.icd = FailoverConnectionDescriptor.parseFailoverSpec(uri, com.ubermq.jms.common.MessageConstants.DEFAULT_PORT); } |
From: <ji...@us...> - 2004-01-21 02:05:30
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/proc In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client/proc Modified Files: LrmpClientProc.java ClientProc.java Log Message: bug fixes related to read/write initialization race conditions Index: LrmpClientProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/proc/LrmpClientProc.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** LrmpClientProc.java 19 Sep 2002 17:30:32 -0000 1.3 --- LrmpClientProc.java 21 Jan 2004 02:05:27 -0000 1.4 *************** *** 1,5 **** package com.ubermq.jms.client.proc; ! import com.ubermq.jms.server.datagram.IControlDatagramFactory; import sun.security.krb5.internal.crypto.f; --- 1,5 ---- package com.ubermq.jms.client.proc; ! import com.ubermq.jms.common.datagram.IControlDatagramFactory; import sun.security.krb5.internal.crypto.f; Index: ClientProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/proc/ClientProc.java,v retrieving revision 1.25 retrieving revision 1.26 diff -C2 -d -r1.25 -r1.26 *** ClientProc.java 12 Jan 2004 19:24:24 -0000 1.25 --- ClientProc.java 21 Jan 2004 02:05:27 -0000 1.26 *************** *** 4,10 **** import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.routing.*; ! import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; --- 4,10 ---- import com.ubermq.jms.client.*; import com.ubermq.jms.client.impl.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.routing.*; ! import com.ubermq.jms.common.routing.impl.*; import com.ubermq.kernel.*; import com.ubermq.kernel.overflow.*; *************** *** 20,23 **** --- 20,25 ---- implements com.ubermq.jms.client.IClientProcessor { + private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(ClientProc.class); + /** * Our routers, one for subscriptions, one for ack's. *************** *** 30,38 **** */ private final Object controlAckNotifier; ! /** * Whether a control ACK succeeded. */ private SynchronizedBoolean controlAckSuccess; /** --- 32,45 ---- */ private final Object controlAckNotifier; ! /** * Whether a control ACK succeeded. */ private SynchronizedBoolean controlAckSuccess; + + /** + * Whether a control ACK was received, either positive or negative. + */ + private SynchronizedBoolean controlAckWasNotified; /** *************** *** 66,70 **** private static final boolean SHOULD_WAIT_FOR_ACK = Boolean.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_SHOULD_WAIT_FOR_ACK, "true")).booleanValue(); ! private static final int RPC_TIMEOUT = 200000; // 2seconds for an RPC ack to come back. /** --- 73,77 ---- private static final boolean SHOULD_WAIT_FOR_ACK = Boolean.valueOf(Configurator.getProperty(ClientConfig.PUBLISH_SHOULD_WAIT_FOR_ACK, "true")).booleanValue(); ! private static final int RPC_TIMEOUT = 10000; // 10seconds for an RPC ack to come back. /** *************** *** 85,88 **** --- 92,96 ---- this.controlAckNotifier = new Object(); this.controlAckSuccess = new SynchronizedBoolean(false); + this.controlAckWasNotified = new SynchronizedBoolean(false); } *************** *** 130,134 **** } } catch(Exception x) { ! com.ubermq.Utility.getLogger().error("", x); } } --- 138,142 ---- } } catch(Exception x) { ! com.ubermq.util.Utility.getLogger().error("", x); } } *************** *** 141,144 **** --- 149,153 ---- { controlAckSuccess.set(!ad.isNegativeAck()); + controlAckWasNotified.set(true); synchronized(controlAckNotifier) { controlAckNotifier.notifyAll(); *************** *** 322,332 **** * @return a boolean indicating if the acknowledgement was * positive or negative. If no acknowledgement is received in the ! * timeout period, false is returned. */ private synchronized boolean outputAndWait(IDatagram d, IOverflowHandler h) { // reset ack success state controlAckSuccess.set(false); // output the data gram. --- 331,347 ---- * @return a boolean indicating if the acknowledgement was * positive or negative. If no acknowledgement is received in the ! * timeout period, an Exception is thrown. ! * @throws IllegalStateException the datagram was not acknowledged ! * in the timeout period, indicating a problem in the infrastructure. */ private synchronized boolean outputAndWait(IDatagram d, IOverflowHandler h) + throws IllegalStateException { + log.debug("Outputting RPC datagram " + d + " on conn " + this.managedConn); + // reset ack success state controlAckSuccess.set(false); + controlAckWasNotified.set(false); // output the data gram. *************** *** 346,350 **** --- 361,372 ---- try { + log.debug("Waiting for RPC reply on conn " + this.managedConn); controlAckNotifier.wait(RPC_TIMEOUT); + if (!controlAckWasNotified.get()) + { + RuntimeException x = new IllegalStateException("Datagram was not acknowledged in the timeout period."); + log.debug("RPC reply timed out", x); + throw x; + } } catch (InterruptedException e) {} *************** *** 353,363 **** return controlAckSuccess.get(); } private final static class TopicSourceSpec ! extends com.ubermq.jms.server.routing.impl.RegexpSourceSpec { public TopicSourceSpec(String spec) { ! super(com.ubermq.jms.server.routing.impl.RegexpHelper.xlat(spec), spec); } --- 375,390 ---- return controlAckSuccess.get(); } + + public String toString() + { + return "ClientProc for " + this.managedConn.toString(); + } private final static class TopicSourceSpec ! extends com.ubermq.jms.common.routing.impl.RegexpSourceSpec { public TopicSourceSpec(String spec) { ! super(com.ubermq.jms.common.routing.impl.RegexpHelper.xlat(spec), spec); } |
From: <ji...@us...> - 2004-01-21 02:05:30
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server/datagram/impl Removed Files: ServerDatagramFactory.java AckDatagram.java ControlDatagram.java ServerMessageDatagram.java DatagramFactory.java DelimitedDatagramFactory.java DatagramTestCase.java MessageDatagram.java Log Message: bug fixes related to read/write initialization race conditions --- ServerDatagramFactory.java DELETED --- --- AckDatagram.java DELETED --- --- ControlDatagram.java DELETED --- --- ServerMessageDatagram.java DELETED --- --- DatagramFactory.java DELETED --- --- DelimitedDatagramFactory.java DELETED --- --- DatagramTestCase.java DELETED --- --- MessageDatagram.java DELETED --- |
From: <ji...@us...> - 2004-01-21 02:05:29
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/common/routing/impl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/common/routing/impl Added Files: SimpleSelector.java ConnectionDestNode.java StaticSourceSpec.java SelectorDestNode.java Router.java RegexpHelper.java RegexpSourceSpec.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: SimpleSelector.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.datagram.*; import com.ubermq.jms.common.routing.*; import java.util.regex.*; import javax.jms.*; /** * A VERY trivial implementation of message selectors. * we only support "WHERE property = scalar" */ public class SimpleSelector implements Selector { private String property, scalar; private int operator; private boolean valid; private static final int EQUALS = 0, GREATER = 1, LESS = 2, NOTEQUAL = 3; public static final String WHERE_REGEX = "where\\s*(\\S*)\\s*(=|>|<|!=|<>)\\s*'??([^']*)'??"; public SimpleSelector(String sz) throws InvalidSelectorException { // parse the string. Pattern p = Pattern.compile(WHERE_REGEX, Pattern.CASE_INSENSITIVE); Matcher m = p.matcher(sz); if (m.matches()) { property = m.group(1); scalar = m.group(3); String op = m.group(2); if (op.equals("=")) operator = EQUALS; else if (op.equals("<>") || op.equals("!=")) operator = NOTEQUAL; else if (op.equals(">")) operator = GREATER; else if (op.equals("<")) operator = LESS; else operator = EQUALS; if (scalar.equalsIgnoreCase("NULL")) scalar = null; valid = true; } else { throw new InvalidSelectorException(""); } } public boolean accept(IMessageDatagram msg) { Object value = null; // if we are not valid, return false for everything. if (!valid) return false; if (property.equals("JMSDeliveryMode")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE); } else if (property.equals("JMSPriority")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_PRIORITY); } else if (property.equals("JMSMessageID")) { value = msg.getMessageId(); } else if (property.equals("JMSTimestamp")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP); } else if (property.equals("JMSCorrelationID")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_CORRELATIONID); } else { value = msg.getCustomProperty(property); } com.ubermq.util.Utility.getLogger().debug("selector comparing " + property + " containing " + value + " with " + scalar); if (value == null || scalar == null) { return (value == null && scalar == null && operator == EQUALS); } else { switch(operator) { case EQUALS: return (value.toString().equals(scalar)); case NOTEQUAL: return !(value.toString().equals(scalar)); default: return false; } } } public String toString() { return "Property: " + property + " operator: " + operator + " scalar: " + scalar; } } --- NEW FILE: ConnectionDestNode.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; import com.ubermq.kernel.*; import java.io.*; /** * Implements a destination that contains information about a connected peer. * The message server will use this information from the router to forward * messages to their logical destinations. */ public class ConnectionDestNode implements RouteDestNode, DatagramSink, Comparable, java.io.Serializable { private IConnectionInfo conn; public ConnectionDestNode(IConnectionInfo ci) {this.conn = ci;} public final IConnectionInfo getConnection() {return conn;} public boolean isOpen() {return getConnection().isOpen();} public void output(IDatagram d, IOverflowHandler h) throws IOException { conn.output(d, h); } public String getDisplayName() {return conn.toString();} public String getNodeName() {return conn.getId();} public String toString() {return conn.toString();} public boolean equals(Object o) { if (o instanceof RouteDestNode) { return (getNodeName().equals(((RouteDestNode)o).getNodeName())); } else { return false; } } public int compareTo(Object obj) throws ClassCastException { RouteDestNode dn = (RouteDestNode)obj; return (getNodeName().compareTo(dn.getNodeName())); } public int hashCode() {return getNodeName().hashCode();} } --- NEW FILE: StaticSourceSpec.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; public class StaticSourceSpec implements SourceSpec, java.io.Serializable { private String expr; /** * Constructs a static source specifier, from * the given string expression. * @param expr the expression representing the source specifier. */ public StaticSourceSpec(String expr) { this.expr = expr; } public String getDisplayName() {return expr;} public String toString() {return expr;} /** * Strictly matches this static specifier and * another implementation of the interface. */ public boolean matches(SourceSpec s) { return expr.equals(s.toString()); } public boolean isMoreSpecificThan(SourceSpec s) { return false; } public int hashCode() { return expr.hashCode(); } public boolean equals(Object o) { if (o instanceof SourceSpec) { return this.toString().equals(o.toString()); } else return false; } public boolean shouldCacheResults() {return true;} } --- NEW FILE: SelectorDestNode.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.datagram.*; import com.ubermq.jms.common.routing.*; import com.ubermq.kernel.*; import java.io.*; public class SelectorDestNode extends ConnectionDestNode { private Selector selector; public SelectorDestNode(IConnectionInfo ci, String selector) throws javax.jms.InvalidSelectorException { super(ci); this.selector = new SimpleSelector(selector); com.ubermq.util.Utility.getLogger().debug("selector is " + this.selector.toString()); } /** * expose the output() method for the connection so we can * write to the endpoint. */ public void output(IDatagram d, IOverflowHandler h) throws IOException { if (d instanceof IMessageDatagram) { boolean accept = selector.accept((IMessageDatagram)d); com.ubermq.util.Utility.getLogger().debug("selector returned " + accept); if (!accept) return; } super.output(d, h); } } --- NEW FILE: Router.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; import java.util.*; import java.io.*; /** * The core router implementation providing a route table. This implementation * has memoization for faster route resolutions, a statistics interface, * and supports dynamic route table modifications. <P> * * Possible enhancements include: keeping memoized routes if route table * changes would not affect them, better statistics, and a way * to serialize route table information to XML.<P> * * This implementation is not thread-safe. Guard all access of the * router with appropriate synchronization. * */ public final class Router implements IRouter, IConfigurableRouter { private final List excludedRoutes, routes; private final Set nodes; private String myLabel = "undefined"; // memoize our route table private transient final Map memoization; private transient volatile int nCacheHits, nCacheRequests; private static final int MEMOIZATION_INITIAL_SIZE = 50; // constants private static final Set emptySet = Collections.EMPTY_SET; public static final long serialVersionUID=15; public Router() { this.excludedRoutes = new ArrayList(); this.routes = new ArrayList(); this.nodes = new HashSet(); this.memoization = new HashMap(MEMOIZATION_INITIAL_SIZE); } public Router(String label) { this(); setNodeLabel(label); } public void setNodeLabel(String label) { myLabel = label; } public String getNodeLabel() { return myLabel; } public void reset() { excludedRoutes.clear(); routes.clear(); nodes.clear(); resetCache(); } private void resetCache() { memoization.clear(); } public Collection getRoutes(String spec) { return getRoutes(new StaticSourceSpec(spec)); } public Collection getRoutes(SourceSpec spec) { Object cached; nCacheRequests++; if (spec.shouldCacheResults()) { if ((cached = memoization.get(spec)) != null) { nCacheHits++; return ((Collection)cached); } else { Collection routes = reallyGetRoutes(spec); memoization.put(spec,routes); return routes; } } else { return reallyGetRoutes(spec); } } /** * Returns a set of source specifications that are currently * mapped to the specified destination node. This is considered a reverse * lookup and may potentially be costly, depending on implementation.<P> * * @param dest the destination * @return a Collection of SourceSpec objects * @throws UnsupportedOperationException if the router does not * choose to implement this reverse lookup function. */ public Collection getRoutesTo(RouteDestNode dest) { Set toRemove = new HashSet(); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.dest.equals(dest)) toRemove.add(br.src); } return toRemove; } /** * Does a reverse-lookup and returns a collection of * <code>BoundRoute</code> objects that are the mappings * such that <code>BoundRoute.dest.equals(dest)</code>. * * @return Collection of BoundRoute objects * @param dest the destination */ private Collection getBoundRoutesTo(RouteDestNode dest) { Set toRemove = new HashSet(); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.dest.equals(dest)) toRemove.add(br); } return toRemove; } private Set getExcludedSet(SourceSpec spec) { Set exclusionSet = new TreeSet(); // collect a list of nodes that we are excluded from. for(Iterator i = excludedRoutes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.src.matches(spec)) { exclusionSet.add(br.dest); } } return exclusionSet; } public Set getKnownNodes() { return nodes; } private Collection reallyGetRoutes(SourceSpec spec) { Set exclusionSet, sendSet; // if we are excluded from everywhere that we know about, // shortcut and don't look at any more routes. exclusionSet = getExcludedSet( spec ); if (exclusionSet.containsAll(nodes)) return emptySet; Map preSendMap = new TreeMap(); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.src.matches(spec)) { Object existingSource = preSendMap.get(br.dest); if (existingSource == null || br.src.isMoreSpecificThan(((BoundRoute)existingSource).src)) { preSendMap.put(br.dest, br); } } } // take the dest's from the sendmap as sendSet sendSet = new TreeSet(); for(Iterator i = preSendMap.values().iterator();i.hasNext();) { sendSet.add( ((BoundRoute)i.next()).getDestination() ); } // well now we just take {routed - excluded} and give it back // we will exclude any nodes that we do not know about. sendSet.retainAll(nodes); sendSet.removeAll(exclusionSet); return sendSet; } public void addRoute(SourceSpec spec, RouteDestNode rdn) { routes.remove( new BoundRoute(spec, rdn) ); routes.add( new BoundRoute(spec, rdn) ); resetCache(); } public void excludeRoute(SourceSpec spec, RouteDestNode rdn) { excludedRoutes.add( new BoundRoute(spec, rdn) ); resetCache(); } public void remove(SourceSpec spec, RouteDestNode rdn) { // remove this one from either routes and/or excluded routes BoundRoute br = new BoundRoute(spec, rdn); routes.remove(br); resetCache(); } public void removeExclusion(SourceSpec spec, RouteDestNode rdn) { // remove this one from either routes and/or excluded routes BoundRoute br = new BoundRoute(spec, rdn); excludedRoutes.remove(br); resetCache(); } public void removeRoutesTo(RouteDestNode node) { // remove routes Collection toRemove = getBoundRoutesTo(node); for(Iterator j = toRemove.iterator();j.hasNext();) routes.remove(j.next()); // remove exclusions toRemove.clear(); for(Iterator i = excludedRoutes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.dest.equals(node)) toRemove.add(br); } for(Iterator j = toRemove.iterator();j.hasNext();) excludedRoutes.remove(j.next()); } public void addKnownNode(RouteDestNode node) { nodes.add(node); resetCache(); } public void removeKnownNode(RouteDestNode node) { nodes.remove(node); resetCache(); } public String toString() { StringBuffer sb = new StringBuffer(); sb.append("Routes:\n"); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); sb.append(br.toString()); sb.append("\n"); } if (excludedRoutes.size() > 0) { sb.append("\nExcluded Routes:\n"); for(Iterator i = excludedRoutes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); sb.append(br.toString()); sb.append("\n"); } } sb.append("\nNodes: "); for(Iterator i = nodes.iterator();i.hasNext();) { sb.append( ((RouteDestNode)i.next()).toString() ).append(" "); } sb.append("\n"); return sb.toString(); } private static class BoundRoute { private SourceSpec src; private RouteDestNode dest; private BoundRoute(SourceSpec src, RouteDestNode rdn) { this.src = src; this.dest = rdn; } public BoundRoute() { } public RouteDestNode getDestination() {return dest;} public String toString() { return src.toString() + " -> " + dest.toString(); } public boolean equals(Object obj) { try { BoundRoute br = (BoundRoute)obj; return (br.src.equals(src) && br.dest.equals(dest)); } catch(ClassCastException cce) { return false; } } } } --- NEW FILE: RegexpHelper.java --- package com.ubermq.jms.common.routing.impl; /** * A helper class providing regex translation facilities. */ public class RegexpHelper { /** * The UberMQ topic matching scheme is quite simple, but provides * access to pure regex if that functionality is desired. * <P> * * <code>*</code> matches exactly one topic level, e.g. * <code>a.b.*</code> matches <code>a.b.x</code>, <code>a.b.y</code>, * but not <code>a.b.y.z</code> * <P> * * <code>#</code> matches any number of topic levels, e.g. * <code>a.b.#</code> matches <code>a.b.x</code>, <code>a.b.y</code>, * and <code>a.b.y.z</code> * <P> * * Topic spaces preceded by a . will not be matched by wildcards, * so .secret will not be matched by #, nor *. This serves * to hide topics from aggressive wildcarding. These topics can be * subscribed to by explicitly naming them. * <P> * * System topics begin with a <code>.$</code>. Please do not subscribe to or use these, * or your application behavior will be undefined. * <P> * * Finally, for regex fans, the underlying regex engine can be * accessed by specifying a tilde (~) as the first character of the topic name. * So, <code>~.*</code> will match all topic names, even hidden or system topics. */ public static String xlat(String friendly) { // bypass the translation if the first character is a ~ if (friendly.length() > 0 && friendly.charAt(0) == '~') return friendly.substring(1); StringBuffer regexp = new StringBuffer(friendly); replaceChar(regexp, '$', "\\$"); replaceChar(regexp, '.', "\\."); replaceChar(regexp, '*', "[^\\.]*"); replaceChar(regexp, '#', "[^\\.].*"); return regexp.toString(); } /** * Replaces all instances of a character in a string buffer with * a character sequence. */ public static void replaceChar(StringBuffer regexp, char ch, String repl) { for(int i=0;i < regexp.length();i++) { if (regexp.charAt(i) == ch) { regexp.replace(i, i+1, repl); i += repl.length(); } } } } --- NEW FILE: RegexpSourceSpec.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; import java.io.*; import java.util.regex.*; /** * A source specification that uses regular expressions to * determine if two specifications match. */ public class RegexpSourceSpec implements Externalizable, SourceSpec { private transient Pattern regexp; private String expr; private String nice; private static final long serialVersionUID = 5; public RegexpSourceSpec() { } public RegexpSourceSpec(String expr) { this(expr, expr); } public RegexpSourceSpec(String expr, String nice) throws IllegalArgumentException { this.expr = expr; this.nice = nice; this.regexp = Pattern.compile(this.expr, Pattern.CASE_INSENSITIVE); } public void readExternal(ObjectInput in) throws IOException { nice = in.readUTF(); expr = in.readUTF(); regexp = Pattern.compile(expr, Pattern.CASE_INSENSITIVE); } public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(nice); out.writeUTF(expr); } /** * @return a human readable name for this destination. */ public String getDisplayName() { return nice; } /** * An implementation can return true to indicate that * the results of this routing should be cached. * Mostly this should return true except if the created * source spec is a throwaway and will only ever match once. */ public boolean shouldCacheResults() { return true; } public boolean matches(SourceSpec s) { return regexp.matcher(s.toString()).matches(); } public boolean isMoreSpecificThan(SourceSpec s) { try { RegexpSourceSpec rss = (RegexpSourceSpec)s; return (specificity() > rss.specificity()); } catch(ClassCastException cce) {return false;} } private int specificity() { int score = 0; java.util.StringTokenizer st = new java.util.StringTokenizer(nice, "."); while(st.hasMoreTokens()) { String token = st.nextToken(); char ch = token.charAt(0); if (Character.isLetterOrDigit(ch)) score += 3; else if (ch == '*') score += 2; else if (ch == '#') score += 1; } return score; } public String toString() { return nice; } public boolean equals(Object obj) { if(obj instanceof RegexpSourceSpec) { return nice.equals( ((RegexpSourceSpec)obj).nice ); } else if (obj instanceof SourceSpec) return matches((SourceSpec)obj); else return false; } } |
From: <ji...@us...> - 2004-01-21 02:05:29
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/multicast In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/client/multicast Modified Files: LrmpConnectionInfo.java MulticastTopicConnection.java Log Message: bug fixes related to read/write initialization race conditions Index: LrmpConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/multicast/LrmpConnectionInfo.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** LrmpConnectionInfo.java 12 Jan 2004 19:24:24 -0000 1.12 --- LrmpConnectionInfo.java 21 Jan 2004 02:05:26 -0000 1.13 *************** *** 1,5 **** package com.ubermq.jms.client.multicast; ! import com.ubermq.jms.server.datagram.impl.*; import com.ubermq.kernel.*; import inria.net.lrmp.*; --- 1,5 ---- package com.ubermq.jms.client.multicast; ! import com.ubermq.jms.common.datagram.impl.*; import com.ubermq.kernel.*; import inria.net.lrmp.*; *************** *** 67,71 **** profile); } catch(LrmpException e) { ! com.ubermq.Utility.getLogger().error("", e); throw new java.io.IOException(e.toString()); } --- 67,71 ---- profile); } catch(LrmpException e) { ! com.ubermq.util.Utility.getLogger().error("", e); throw new java.io.IOException(e.toString()); } *************** *** 122,126 **** if (readBuffer.position() != 0) ! com.ubermq.Utility.getLogger().fatal("processData() position is nonzero"); readBuffer.put(pack.getDataBuffer(), pack.getOffset(), pack.getDataLength()); } --- 122,126 ---- if (readBuffer.position() != 0) ! com.ubermq.util.Utility.getLogger().fatal("processData() position is nonzero"); readBuffer.put(pack.getDataBuffer(), pack.getOffset(), pack.getDataLength()); } *************** *** 144,148 **** } catch (javax.jms.JMSException e) { ! com.ubermq.Utility.getLogger().error("", e); } break; --- 144,148 ---- } catch (javax.jms.JMSException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } break; Index: MulticastTopicConnection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/multicast/MulticastTopicConnection.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** MulticastTopicConnection.java 12 Dec 2002 18:01:48 -0000 1.8 --- MulticastTopicConnection.java 21 Jan 2004 02:05:26 -0000 1.9 *************** *** 7,11 **** import com.ubermq.jms.client.multicast.LrmpClientSession; import com.ubermq.jms.client.proc.LrmpClientProc; ! import com.ubermq.jms.server.datagram.impl.DatagramFactory; import java.io.IOException; --- 7,11 ---- import com.ubermq.jms.client.multicast.LrmpClientSession; import com.ubermq.jms.client.proc.LrmpClientProc; ! import com.ubermq.jms.common.datagram.impl.DatagramFactory; import java.io.IOException; |
From: <ji...@us...> - 2004-01-21 02:05:29
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/server Modified Files: ServerConfig.java MessageServer.java ClusteredServer.java Added Files: ServerTestCase.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: ServerTestCase.java --- /* * Contents copyright 2002-03 Rhombus Technologies. */ package com.ubermq.jms.server; import java.util.*; import javax.jms.*; import junit.framework.*; import com.ubermq.jms.client.*; import com.ubermq.jms.client.test.*; /** * Tests various aspects of the server, such as clustering and direct pipe connections.<P> * @author jp * @since 2.4 */ public class ServerTestCase extends TestCase { public ServerTestCase(String arg0) { super(arg0); } /** * Tests Clustering. */ public void testClustering() throws JMSException { Properties p = new Properties(); p.put("server.port", "5001"); MessageServer ms = new MessageServer(p); ms.addStandardProtocols(); ms.run(); p.put("server.port", "5002"); p.put("clustering.enable", "true"); p.put("clustering.forward", "ubermq://localhost:5001"); MessageServer ms2 = new MessageServer(p); ms2.addStandardProtocols(); ms2.run(); TopicConnectionFactory localFactory = new URLTopicConnectionFactory("ubermq://localhost:5001"), remoteFactory = new URLTopicConnectionFactory("ubermq://localhost:5002"); RegressionTestCase.sendAndReceive(localFactory, remoteFactory, "A", "A", null, 20, 20); RegressionTestCase.sendAndReceive(localFactory, "hello", "hello", null, 20, 20); RegressionTestCase.sendAndReceive(localFactory, "B", "B", "where ordinal != 2", 10, 9); RegressionTestCase.sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20); } /** * Tests pipe connections and makes sure they work w/ themselves, and others. */ public void testPipes() throws JMSException { Properties p = new Properties(); p.put("server.port", "5003"); MessageServer ms = new MessageServer(p); ms.addStandardProtocols(); ms.run(); TopicConnectionFactory localFactory = new PipeConnectionFactory(ms), remoteFactory = new URLTopicConnectionFactory(ms.getServiceUrl()); System.out.println("testPipes"); RegressionTestCase.sendAndReceive(localFactory, remoteFactory, "A", "A", null, 20, 20); RegressionTestCase.sendAndReceive(localFactory, "hello", "hello", null, 20, 20); RegressionTestCase.sendAndReceive(localFactory, "B", "B", "where ordinal != 2", 10, 9); RegressionTestCase.sendAndReceive(remoteFactory, localFactory, "B", "B", null, 20, 20); } } Index: ServerConfig.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/ServerConfig.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** ServerConfig.java 24 Jan 2003 15:34:49 -0000 1.9 --- ServerConfig.java 21 Jan 2004 02:05:26 -0000 1.10 *************** *** 29,33 **** * to retrieve an instance of the object. It must also implement * IDatagramFactory and IAckDatagramFactory. ! * @see com.ubermq.jms.server.datagram.IDatagramFactory, * com.ubermq.jms.server.datagram.IAckDatagramFactory */ --- 29,33 ---- * to retrieve an instance of the object. It must also implement * IDatagramFactory and IAckDatagramFactory. ! * @see com.ubermq.jms.common.datagram.IDatagramFactory, * com.ubermq.jms.server.datagram.IAckDatagramFactory */ *************** *** 119,122 **** --- 119,123 ---- public static final String DURABLE_LOG_SIZE = "server.durable.logsize"; + ///////////// DATAGRAM PROCESSOR *************** *** 138,148 **** /** - * the size of the buffer allocated for custom properties. this can be small - * if you are not actively using properties, but make it large if you - * are putting objects,etc. in there - */ - public static final String DGP_MAXIMUM_PROPERTY_LENGTH = "server.dgram.maximumprops"; - - /** * whether our clients care about ACK packets or not. This should usually be true * to throttle publishers to a reasonable pace. --- 139,142 ---- *************** *** 161,164 **** --- 155,159 ---- public static final String DGP_OVERFLOW_HANDLER_INIT = "server.dgram.overflowhandler.init"; + ////////////////////////// SSL Index: MessageServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/MessageServer.java,v retrieving revision 1.46 retrieving revision 1.47 diff -C2 -d -r1.46 -r1.47 *** MessageServer.java 12 Jan 2004 19:24:25 -0000 1.46 --- MessageServer.java 21 Jan 2004 02:05:26 -0000 1.47 *************** *** 4,9 **** import com.ubermq.jms.server.admin.*; import com.ubermq.jms.server.cluster.*; ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.jms.server.datagram.impl.*; import com.ubermq.jms.server.journal.*; import com.ubermq.jms.server.journal.impl.*; --- 4,10 ---- import com.ubermq.jms.server.admin.*; import com.ubermq.jms.server.cluster.*; ! import com.ubermq.jms.common.*; ! import com.ubermq.jms.common.datagram.*; ! import com.ubermq.jms.common.datagram.impl.*; import com.ubermq.jms.server.journal.*; import com.ubermq.jms.server.journal.impl.*; *************** *** 55,59 **** private ReadWriteTransformThread[] read, write; - public static final int DEFAULT_PORT = 3999; private static final int CLUSTER_MAX_CXNS = 2; private static final String ALL_TOPICS = "#"; --- 56,59 ---- *************** *** 167,171 **** read = new ReadWriteTransformThread[RW_THREAD_COUNT]; write = new ReadWriteTransformThread[RW_THREAD_COUNT]; ! com.ubermq.Utility.getLogger().debug("Creating read/write pool of size " + RW_THREAD_COUNT); for (int i = 0; i < RW_THREAD_COUNT; i++) { --- 167,171 ---- read = new ReadWriteTransformThread[RW_THREAD_COUNT]; write = new ReadWriteTransformThread[RW_THREAD_COUNT]; ! com.ubermq.util.Utility.getLogger().debug("Creating read/write pool of size " + RW_THREAD_COUNT); for (int i = 0; i < RW_THREAD_COUNT; i++) { *************** *** 192,200 **** { int i = new Random().nextInt(read.length); ! read[i].register((ConnectionInfo)incoming); ! write[i].register((ConnectionInfo)incoming); } }); ! com.ubermq.Utility.getLogger().info("Protocol " + p.toString() + " started"); if (serviceURI == null) --- 192,200 ---- { int i = new Random().nextInt(read.length); ! write[i].register((ConnectionInfo)incoming, true); ! read[i].register((ConnectionInfo)incoming, true); } }); ! com.ubermq.util.Utility.getLogger().info("Protocol " + p.toString() + " started"); if (serviceURI == null) *************** *** 208,212 **** catch(Exception x) { ! com.ubermq.Utility.getLogger().error("", x);; } --- 208,212 ---- catch(Exception x) { ! com.ubermq.util.Utility.getLogger().error("", x);; } *************** *** 251,255 **** protected void onClusterAnnouncement(String url) { ! com.ubermq.Utility.getLogger().info("joining cluster member at " + url); if (clusterCxnCount < CLUSTER_MAX_CXNS) { --- 251,255 ---- protected void onClusterAnnouncement(String url) { ! com.ubermq.util.Utility.getLogger().info("joining cluster member at " + url); if (clusterCxnCount < CLUSTER_MAX_CXNS) { *************** *** 289,293 **** * implementation.<p> * ! * @see com.ubermq.jms.server.datagram.impl.DatagramFactory * @return a datagram factory holder */ --- 289,293 ---- * implementation.<p> * ! * @see com.ubermq.jms.common.datagram.impl.DatagramFactory * @return a datagram factory holder */ *************** *** 350,355 **** dp); dp.accept(ci); ! read[0].register((ConnectionInfo)ci); ! write[0].register((ConnectionInfo)ci); return ci; } --- 350,355 ---- dp); dp.accept(ci); ! write[0].register((ConnectionInfo)ci, true); ! read[0].register((ConnectionInfo)ci, true); return ci; } *************** *** 365,369 **** { return Integer.valueOf(Configurator.getProperty(ConfigConstants.SERVER_PORT, ! String.valueOf(DEFAULT_PORT))).intValue(); } --- 365,369 ---- { return Integer.valueOf(Configurator.getProperty(ConfigConstants.SERVER_PORT, ! String.valueOf(MessageConstants.DEFAULT_PORT))).intValue(); } *************** *** 415,419 **** public String toString() { ! return port == DEFAULT_PORT ? "UberMQ" : ("UberMQ (" + port + ")"); } --- 415,419 ---- public String toString() { ! return port == MessageConstants.DEFAULT_PORT ? "UberMQ" : ("UberMQ (" + port + ")"); } *************** *** 428,432 **** try { ! if (port == DEFAULT_PORT) return URI.create("ubermq://" + InetAddress.getLocalHost().getHostName()); else --- 428,432 ---- try { ! if (port == MessageConstants.DEFAULT_PORT) return URI.create("ubermq://" + InetAddress.getLocalHost().getHostName()); else *************** *** 476,480 **** serviceURI = URI.create("//" + InetAddress.getLocalHost().getHostName() + ":" + ADMIN_PORT + "/" + ADMIN_SERVICE_NAME); ! com.ubermq.Utility.getLogger().info("Administrative service running at " + serviceURI.toString()); } catch (java.rmi.RemoteException e) --- 476,480 ---- serviceURI = URI.create("//" + InetAddress.getLocalHost().getHostName() + ":" + ADMIN_PORT + "/" + ADMIN_SERVICE_NAME); ! com.ubermq.util.Utility.getLogger().info("Administrative service running at " + serviceURI.toString()); } catch (java.rmi.RemoteException e) *************** *** 535,539 **** // run it s.run(); ! com.ubermq.Utility.getLogger().info("UberMQ " + com.ubermq.jms.client.impl.Connection.UBERMQ_PROVIDER_VERSION + " running at " + s.getServiceUrl()); } --- 535,539 ---- // run it s.run(); ! com.ubermq.util.Utility.getLogger().info("UberMQ " + com.ubermq.jms.client.impl.Connection.UBERMQ_PROVIDER_VERSION + " running at " + s.getServiceUrl()); } *************** *** 574,578 **** { // fall through ! com.ubermq.Utility.getLogger().error("", e2); } } --- 574,578 ---- { // fall through ! com.ubermq.util.Utility.getLogger().error("", e2); } } Index: ClusteredServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/ClusteredServer.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** ClusteredServer.java 12 Jan 2004 19:24:25 -0000 1.4 --- ClusteredServer.java 21 Jan 2004 02:05:26 -0000 1.5 *************** *** 107,111 **** catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e); } finally --- 107,111 ---- catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e); } finally *************** *** 148,152 **** catch (Exception e) { ! com.ubermq.Utility.getLogger().error("", e); } } --- 148,152 ---- catch (Exception e) { ! com.ubermq.util.Utility.getLogger().error("", e); } } |
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/kernel Modified Files: AcceptThread.java AbstractDatagram.java KernelBasedServer.java ReadWriteTransformThread.java AbstractConnectionInfo.java ConnectionInfo.java Log Message: bug fixes related to read/write initialization race conditions Index: AcceptThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AcceptThread.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** AcceptThread.java 12 Jan 2004 19:24:26 -0000 1.16 --- AcceptThread.java 21 Jan 2004 02:05:25 -0000 1.17 *************** *** 51,55 **** ssc.socket().bind(address); ! com.ubermq.Utility.getLogger().debug("Bound to " + address); listenKey = ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT); } --- 51,55 ---- ssc.socket().bind(address); ! com.ubermq.util.Utility.getLogger().debug("Bound to " + address); listenKey = ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT); } *************** *** 63,67 **** acceptPendingConnections(); } catch(Exception ex) { ! com.ubermq.Utility.getLogger().error("", ex); } } --- 63,67 ---- acceptPendingConnections(); } catch(Exception ex) { ! com.ubermq.util.Utility.getLogger().error("", ex); } } *************** *** 77,85 **** } catch (IOException e) { ! com.ubermq.Utility.getLogger().error("", e); } // we are done ! com.ubermq.Utility.getLogger().info("Server listener shut down."); } --- 77,85 ---- } catch (IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } // we are done ! com.ubermq.util.Utility.getLogger().info("Server listener shut down."); } *************** *** 96,100 **** incomingChannel.configureBlocking(false); configureSocket(incomingChannel.socket()); ! com.ubermq.Utility.getLogger().debug("AcceptThread: Connection from " + incomingChannel.socket().getInetAddress()); // create a socket connection info instance --- 96,100 ---- incomingChannel.configureBlocking(false); configureSocket(incomingChannel.socket()); ! com.ubermq.util.Utility.getLogger().debug("AcceptThread: Connection from " + incomingChannel.socket().getInetAddress()); // create a socket connection info instance Index: AbstractDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractDatagram.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** AbstractDatagram.java 12 Jan 2004 19:24:26 -0000 1.16 --- AbstractDatagram.java 21 Jan 2004 02:05:25 -0000 1.17 *************** *** 2,5 **** --- 2,7 ---- import com.ubermq.*; + import com.ubermq.util.*; + import java.nio.*; import java.nio.charset.*; *************** *** 204,208 **** } catch(java.io.IOException iox) { ! com.ubermq.Utility.getLogger().error("", iox); } } --- 206,210 ---- } catch(java.io.IOException iox) { ! com.ubermq.util.Utility.getLogger().error("", iox); } } Index: KernelBasedServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/KernelBasedServer.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** KernelBasedServer.java 12 Jan 2004 19:24:26 -0000 1.12 --- KernelBasedServer.java 21 Jan 2004 02:05:25 -0000 1.13 *************** *** 30,34 **** catch (java.io.IOException e) { ! com.ubermq.Utility.getLogger().error("", e); } finally --- 30,34 ---- catch (java.io.IOException e) { ! com.ubermq.util.Utility.getLogger().error("", e); } finally Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** ReadWriteTransformThread.java 12 Jan 2004 19:24:26 -0000 1.24 --- ReadWriteTransformThread.java 21 Jan 2004 02:05:25 -0000 1.25 *************** *** 20,23 **** --- 20,24 ---- private Selector selector; private List toRegister, toUnregister, toEnable, toDisable; + private Object serviceNotifier; private int operation; private int spinDetector; *************** *** 42,45 **** --- 43,47 ---- this.toEnable = Collections.synchronizedList(new LinkedList()); this.toDisable = Collections.synchronizedList(new LinkedList()); + this.serviceNotifier = new Object(); this.operation = operation; this.spinDetector = 0; *************** *** 115,122 **** } ! public void register(ConnectionInfo ci) { ! toRegister.add(ci); ! selector.wakeup(); } --- 117,147 ---- } ! /** ! * Registers the connection with this transform IO thread for later ! * servicing. This method can be performed synchronously, if desired.<P> ! * ! * @param ci the connection ! * @param sync true if the method should block until registration has ! * completed. ! */ ! public void register(ConnectionInfo ci, boolean sync) { ! synchronized(serviceNotifier) ! { ! toRegister.add(ci); ! selector.wakeup(); ! ! if (sync) ! { ! try ! { ! serviceNotifier.wait(); ! } ! catch (InterruptedException e) ! { ! log.debug("interrupted waiting for registration", e); ! } ! } ! } } *************** *** 127,140 **** } public void requestService(ConnectionInfo ci) { toEnable.add(ci); selector.wakeup(); } public void cancelServiceRequest(ConnectionInfo ci) { ! toDisable.add(ci); ! selector.wakeup(); } --- 152,187 ---- } + /** + * Requests service for this connection. If a service cancellation + * request is still pending for this connection, it is removed.<P> + * + * @param ci the connection to service + */ public void requestService(ConnectionInfo ci) { toEnable.add(ci); + toDisable.remove(ci); selector.wakeup(); } + /** + * Cancels a service request for this connection. The cancellation + * request will not be honored if there is another caller who has + * registered a pending service request for the same connection. This + * is to prevent a situation where one thread requests service and + * another thread, just having been serviced, cancels it. <P> + * + * In such a case, the cancellation request is ignored and the pending + * service request takes precedence.<P> + * + * @param ci the connection to cancel service for + */ public void cancelServiceRequest(ConnectionInfo ci) { ! if (!toEnable.contains(ci)) ! { ! toDisable.add(ci); ! selector.wakeup(); ! } } *************** *** 190,194 **** { sk.interestOps(operation); ! log.debug("enabled " + conn + " for op " + operation); } else --- 237,241 ---- { sk.interestOps(operation); ! log.debug("enabled " + conn + " for op " + getFriendlyOpName(operation)); } else *************** *** 209,214 **** if (sk != null) { ! sk.interestOps(0); ! log.debug("disabled " + conn + " for op " + operation); } else --- 256,269 ---- if (sk != null) { ! try ! { ! sk.interestOps(0); ! } ! catch(CancelledKeyException cke) ! { ! // ignore this ! log.debug("can't disable - already cancelled " + conn); ! } ! log.debug("disabled " + conn + " for op " + getFriendlyOpName(operation)); } else *************** *** 217,220 **** --- 272,293 ---- toDisable.clear(); } + + // DONE, notify those who are waiting on us. + synchronized(serviceNotifier) + { + serviceNotifier.notifyAll(); + } + } + + private String getFriendlyOpName(int op) + { + if (op == SelectionKey.OP_READ) + return "OP_READ"; + else if (op == SelectionKey.OP_WRITE) + return "OP_WRITE"; + else if (op == SelectionKey.OP_ACCEPT) + return "OP_ACCEPT"; + else + return String.valueOf(op); } *************** *** 234,238 **** { SelectableChannel channel = getChannelForConnection(conn); ! log.debug("registering new channel " + channel + " for cxn " + conn); channel.configureBlocking(false); --- 307,311 ---- { SelectableChannel channel = getChannelForConnection(conn); ! log.debug("registering " + getFriendlyOpName(operation) + ": conn " + conn + ", new channel is " + channel); channel.configureBlocking(false); Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.29 retrieving revision 1.30 diff -C2 -d -r1.29 -r1.30 *** AbstractConnectionInfo.java 12 Jan 2004 19:24:26 -0000 1.29 --- AbstractConnectionInfo.java 21 Jan 2004 02:05:25 -0000 1.30 *************** *** 1,13 **** package com.ubermq.kernel; - import EDU.oswego.cs.dl.util.concurrent.*; - import com.ubermq.*; - import com.ubermq.kernel.*; - import com.ubermq.kernel.event.*; import java.io.*; import java.nio.*; import java.util.*; import org.apache.log4j.*; /** * Implements the IConnectionInfo as an abstract base class, and --- 1,15 ---- package com.ubermq.kernel; import java.io.*; import java.nio.*; import java.util.*; + import org.apache.log4j.*; + import EDU.oswego.cs.dl.util.concurrent.*; + + import com.ubermq.kernel.event.*; + import com.ubermq.util.*; + /** * Implements the IConnectionInfo as an abstract base class, and *************** *** 328,332 **** { writeBuffer.flip(); ! doWrite(writeBuffer); efficientCompact(writeBuffer); } --- 330,335 ---- { writeBuffer.flip(); ! int n = doWrite(writeBuffer); ! log.debug(this + " flushed " + n + " octets"); efficientCompact(writeBuffer); } *************** *** 338,341 **** --- 341,346 ---- catch(java.io.IOException iox) { + log.debug("Unable to write bytes", iox); + // tell the listeners sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.22 retrieving revision 1.23 diff -C2 -d -r1.22 -r1.23 *** ConnectionInfo.java 12 Jan 2004 19:24:26 -0000 1.22 --- ConnectionInfo.java 21 Jan 2004 02:05:26 -0000 1.23 *************** *** 80,85 **** throws IOException { ! if (super.readyToWrite() && ! writeHandler != null) { writeHandler.requestService(this); --- 80,87 ---- throws IOException { ! if (writeHandler == null) ! throw new IllegalStateException("writeHandler cannot be null in requestWrite()"); ! ! if (super.readyToWrite()) { writeHandler.requestService(this); *************** *** 89,96 **** protected void cancelWriteRequest() { ! if (writeHandler != null) ! { ! writeHandler.cancelServiceRequest(this); ! } } --- 91,98 ---- protected void cancelWriteRequest() { ! if (writeHandler == null) ! throw new IllegalStateException("writeHandler cannot be null in cancelWriteRequest()"); ! ! writeHandler.cancelServiceRequest(this); } |