ubermq-commits Mailing List for UberMQ (Page 4)
Brought to you by:
jimmyp
You can subscribe to this list here.
2002 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(43) |
Sep
(204) |
Oct
(178) |
Nov
(38) |
Dec
(67) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2003 |
Jan
(63) |
Feb
(11) |
Mar
(5) |
Apr
(1) |
May
(8) |
Jun
(29) |
Jul
(3) |
Aug
(3) |
Sep
(5) |
Oct
|
Nov
|
Dec
|
2004 |
Jan
(64) |
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <ji...@us...> - 2003-06-26 20:23:09
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc In directory sc8-pr-cvs1:/tmp/cvs-serv16828/src/com/ubermq/jms/server/proc Modified Files: DatagramProc.java Log Message: reduce verbosity Index: DatagramProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/DatagramProc.java,v retrieving revision 1.38 retrieving revision 1.39 diff -C2 -d -r1.38 -r1.39 *** DatagramProc.java 16 Jun 2003 00:01:52 -0000 1.38 --- DatagramProc.java 26 Jun 2003 20:23:06 -0000 1.39 *************** *** 414,419 **** catch (IOException e) { - e.printStackTrace(); - // looks like the caller went down. // blow him away. --- 414,417 ---- |
From: <ji...@us...> - 2003-06-26 16:29:01
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv8730/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java ReadWriteTransformThread.java Log Message: connection failure now kills write request Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.26 retrieving revision 1.27 diff -C2 -d -r1.26 -r1.27 *** AbstractConnectionInfo.java 16 Jun 2003 00:01:53 -0000 1.26 --- AbstractConnectionInfo.java 26 Jun 2003 16:28:58 -0000 1.27 *************** *** 338,341 **** --- 338,342 ---- // free resources. this connection is no longer usable. + cancelWriteRequest(); close(); Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.18 retrieving revision 1.19 diff -C2 -d -r1.18 -r1.19 *** ReadWriteTransformThread.java 16 Jun 2003 00:01:53 -0000 1.18 --- ReadWriteTransformThread.java 26 Jun 2003 16:28:58 -0000 1.19 *************** *** 13,17 **** extends Thread { ! private Selector readSelector; private ConnectionList acceptedConnections; private int operation; --- 13,17 ---- extends Thread { ! private Selector selector; private ConnectionList acceptedConnections; private int operation; *************** *** 28,32 **** setDaemon(true); ! this.readSelector = selector; this.acceptedConnections = acceptedConnections; this.operation = operation; --- 28,32 ---- setDaemon(true); ! this.selector = selector; this.acceptedConnections = acceptedConnections; this.operation = operation; *************** *** 46,50 **** { registerNewChannels(); ! readSelector.select(); acceptPendingRequests(); --- 46,50 ---- { registerNewChannels(); ! selector.select(); acceptPendingRequests(); *************** *** 57,61 **** // kill all associated connections ! Iterator iter = readSelector.keys().iterator(); while (iter.hasNext()) { --- 57,61 ---- // kill all associated connections ! Iterator iter = selector.keys().iterator(); while (iter.hasNext()) { *************** *** 92,96 **** { channel.configureBlocking(false); ! final SelectionKey inKey = channel.register(readSelector, operation, conn); --- 92,96 ---- { channel.configureBlocking(false); ! final SelectionKey inKey = channel.register(selector, operation, conn); *************** *** 120,124 **** throws IOException { ! Set readyKeys = readSelector.selectedKeys(); for(Iterator i = readyKeys.iterator(); i.hasNext(); ) --- 120,124 ---- throws IOException { ! Set readyKeys = selector.selectedKeys(); for(Iterator i = readyKeys.iterator(); i.hasNext(); ) |
From: <ji...@us...> - 2003-06-16 14:57:55
|
Update of /cvsroot/ubermq/jms In directory sc8-pr-cvs1:/tmp/cvs-serv1142 Modified Files: build.xml Log Message: version bump Index: build.xml =================================================================== RCS file: /cvsroot/ubermq/jms/build.xml,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** build.xml 16 Jun 2003 14:55:57 -0000 1.24 --- build.xml 16 Jun 2003 14:57:52 -0000 1.25 *************** *** 1,5 **** <project name="ubermq" default="dist" basedir="."> ! <property name="version" value="2.2"/> <property name="src" value="src"/> --- 1,5 ---- <project name="ubermq" default="dist" basedir="."> ! <property name="version" value="2.3"/> <property name="src" value="src"/> |
From: <ji...@us...> - 2003-06-16 14:57:30
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory sc8-pr-cvs1:/tmp/cvs-serv1119/jms/client/impl Modified Files: Connection.java Log Message: version bump Index: Connection.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/Connection.java,v retrieving revision 1.30 retrieving revision 1.31 diff -C2 -d -r1.30 -r1.31 *** Connection.java 27 Mar 2003 15:19:43 -0000 1.30 --- Connection.java 16 Jun 2003 14:57:26 -0000 1.31 *************** *** 28,32 **** // version information public static final int UBERMQ_MAJOR_VERSION = 2; ! public static final int UBERMQ_MINOR_VERSION = 2; public static final int UBERMQ_REVISION = 0; public static final String UBERMQ_PROVIDER_NAME = "UberMQ"; --- 28,32 ---- // version information public static final int UBERMQ_MAJOR_VERSION = 2; ! public static final int UBERMQ_MINOR_VERSION = 3; public static final int UBERMQ_REVISION = 0; public static final String UBERMQ_PROVIDER_NAME = "UberMQ"; |
From: <ji...@us...> - 2003-06-16 14:56:02
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/cluster In directory sc8-pr-cvs1:/tmp/cvs-serv740/src/com/ubermq/jms/server/cluster Modified Files: ClusterPropagation.java Log Message: clustering connections now perform reconnect logic Index: ClusterPropagation.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/cluster/ClusterPropagation.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** ClusterPropagation.java 7 May 2003 17:41:47 -0000 1.10 --- ClusterPropagation.java 16 Jun 2003 14:55:57 -0000 1.11 *************** *** 2,5 **** --- 2,6 ---- import com.ubermq.jms.client.impl.*; + import com.ubermq.jms.client.unicast.*; import javax.jms.*; *************** *** 15,19 **** { public static final String CONFIG_CLUSTERING_SUBSCRIPTION = "clustering.subscription"; ! private Connection local, foreign; private static final String ALL_REGEX = com.ubermq.kernel.Configurator.getProperty(CONFIG_CLUSTERING_SUBSCRIPTION, "#"); --- 16,21 ---- { public static final String CONFIG_CLUSTERING_SUBSCRIPTION = "clustering.subscription"; ! private Connection local; ! private UnicastConnection foreign; private static final String ALL_REGEX = com.ubermq.kernel.Configurator.getProperty(CONFIG_CLUSTERING_SUBSCRIPTION, "#"); *************** *** 28,32 **** { local = localFactory.createConnection(); ! foreign = foreignFactory.createConnection(); forward(local, foreign); --- 30,35 ---- { local = localFactory.createConnection(); ! foreign = (UnicastConnection)foreignFactory.createConnection(); ! foreign.startHeartbeat(1000); forward(local, foreign); |
From: <ji...@us...> - 2003-06-16 14:56:02
|
Update of /cvsroot/ubermq/jms In directory sc8-pr-cvs1:/tmp/cvs-serv740 Modified Files: build.xml Log Message: clustering connections now perform reconnect logic Index: build.xml =================================================================== RCS file: /cvsroot/ubermq/jms/build.xml,v retrieving revision 1.23 retrieving revision 1.24 diff -C2 -d -r1.23 -r1.24 *** build.xml 14 Jun 2003 19:21:28 -0000 1.23 --- build.xml 16 Jun 2003 14:55:57 -0000 1.24 *************** *** 1,5 **** <project name="ubermq" default="dist" basedir="."> ! <property name="version" value="2.1"/> <property name="src" value="src"/> --- 1,5 ---- <project name="ubermq" default="dist" basedir="."> ! <property name="version" value="2.2"/> <property name="src" value="src"/> |
From: <ji...@us...> - 2003-06-16 00:01:56
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java AcceptThread.java ConnectionInfo.java DatagramSink.java IConnectionInfo.java IConnectionOverflowHandler.java IOverflowHandler.java ReadWriteTransformThread.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.25 retrieving revision 1.26 diff -C2 -d -r1.25 -r1.26 *** AbstractConnectionInfo.java 14 Jun 2003 23:34:04 -0000 1.25 --- AbstractConnectionInfo.java 16 Jun 2003 00:01:53 -0000 1.26 *************** *** 27,36 **** protected boolean shouldProcess; ! private boolean open; private ByteBuffer readBuffer; private ByteBuffer writeBuffer; ! private Mutex readMutex, writeMutex; private List eventHandlers; --- 27,36 ---- protected boolean shouldProcess; ! private SynchronizedBoolean open; private ByteBuffer readBuffer; private ByteBuffer writeBuffer; ! private Sync readMutex, writeMutex; private List eventHandlers; *************** *** 108,112 **** if (readBuffer != null) { ! readMutex = new Mutex(); } --- 108,112 ---- if (readBuffer != null) { ! readMutex = new ReentrantLock(); } *************** *** 114,118 **** if (writeBuffer != null) { ! writeMutex = new Mutex(); } --- 114,118 ---- if (writeBuffer != null) { ! writeMutex = new ReentrantLock(); } *************** *** 120,124 **** id = String.valueOf(allocateProcessUniqueId()); shouldProcess = true; ! open = true; eventHandlers = new LinkedList(); proc = p; --- 120,124 ---- id = String.valueOf(allocateProcessUniqueId()); shouldProcess = true; ! open = new SynchronizedBoolean(true); eventHandlers = new LinkedList(); proc = p; *************** *** 130,136 **** public void close() { ! if (open) { ! open = false; proc.remove(this); --- 130,136 ---- public void close() { ! if (open.get()) { ! open.set(false); proc.remove(this); *************** *** 146,150 **** public boolean isOpen() { ! return open; } --- 146,150 ---- public boolean isOpen() { ! return open.get(); } *************** *** 214,245 **** */ public void output(IDatagram d, IOverflowHandler h) ! throws IOException { ! if (!open) throw new IOException("not open"); try { ! writeMutex.acquire(); ! ! try { ! // make a sandbox for the output framer ! ByteBuffer output = writeBuffer.slice(); ! factory.outgoing(output, d); - // update the write buffer position - writeBuffer.position(writeBuffer.position() + output.position()); - } - catch(BufferOverflowException boe) - { // we just ran out of space. // handle it ! if (processOverflow(d, h)) { ! writeMutex.release(); ! output(d, h.getRetryHandler()); } } } catch(InterruptedException ie) --- 214,264 ---- */ public void output(IDatagram d, IOverflowHandler h) ! throws IOException, BufferOverflowException { ! if (!open.get()) throw new IOException("not open"); try { ! while(open.get()) { ! try ! { ! writeMutex.acquire(); ! ! // make a sandbox for the output framer ! ByteBuffer output = writeBuffer.slice(); ! factory.outgoing(output, d); ! ! // update the write buffer position ! writeBuffer.position(writeBuffer.position() + output.position()); ! break; ! } ! catch(BufferOverflowException boe) ! { ! } ! finally ! { ! writeMutex.release(); ! } // we just ran out of space. + // please write some bytes. + requestWrite(); + // handle it ! int a = processOverflow(d, h); ! if (a == IOverflowHandler.ACTION_RETRY) { ! h = h.getRetryHandler(); } + else if (a == IOverflowHandler.ACTION_FAIL) + throw new BufferOverflowException(); + else + break; // IGNORE - we quit. } + + // flush the buffers + requestWrite(); } catch(InterruptedException ie) *************** *** 249,259 **** // do anything to the buffer. } - finally - { - writeMutex.release(); - } - - // flush the buffers - requestWrite(); } --- 268,271 ---- *************** *** 263,267 **** * if so. */ ! private boolean processOverflow(IDatagram d, IOverflowHandler h) { --- 275,279 ---- * if so. */ ! private int processOverflow(IDatagram d, IOverflowHandler h) { Index: AcceptThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AcceptThread.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** AcceptThread.java 10 Jan 2003 22:27:39 -0000 1.14 --- AcceptThread.java 16 Jun 2003 00:01:53 -0000 1.15 *************** *** 18,23 **** private Selector connectSelector; ! private ConnectionList[] acceptedConnections; ! private int n; private IDatagramFactory factory; --- 18,22 ---- private Selector connectSelector; ! private IConnectionInfo.ConnectionAcceptor a; private IDatagramFactory factory; *************** *** 27,52 **** * Instantiate an accept thread. * @param connectSelector the Selector to use to wait for connections. - * @param cxns connections should be added to this ConnectionList as they arrive. - * @param port the port to listen on. - * @param factory the datagram factory to use when creating SocketConnectionInfo objects - * @param incomingProc the processor to use when creating SocketConnectionInfo objects - */ - public AcceptThread(Selector connectSelector, - ConnectionList cxns, - int port, - IDatagramFactory factory, - IMessageProcessor incomingProc) - throws IOException - { - this(connectSelector, - new ConnectionList[] {cxns}, - port, - factory, - incomingProc); - } - - /** - * Instantiate an accept thread. - * @param connectSelector the Selector to use to wait for connections. * @param cxns[] connections are added to these ConnectionList's in a round robin fashion as they arrive. * @param port the port to listen on. --- 26,29 ---- *************** *** 55,59 **** */ public AcceptThread(Selector connectSelector, ! ConnectionList[] cxns, int port, IDatagramFactory factory, --- 32,36 ---- */ public AcceptThread(Selector connectSelector, ! IConnectionInfo.ConnectionAcceptor a, int port, IDatagramFactory factory, *************** *** 64,70 **** this.connectSelector = connectSelector; ! this.acceptedConnections = new ConnectionList[cxns.length]; ! System.arraycopy(cxns, 0, this.acceptedConnections, 0, cxns.length); ! this.n = 0; this.factory = factory; this.incomingProc = incomingProc; --- 41,45 ---- this.connectSelector = connectSelector; ! this.a = a; this.factory = factory; this.incomingProc = incomingProc; *************** *** 127,131 **** SocketConnectionInfo conn = new SocketConnectionInfo(incomingChannel, factory, incomingProc); incomingProc.accept(conn); ! acceptedConnections[n++ % acceptedConnections.length].push(conn); } } --- 102,106 ---- SocketConnectionInfo conn = new SocketConnectionInfo(incomingChannel, factory, incomingProc); incomingProc.accept(conn); ! a.acceptIncomingConnection(conn); } } Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** ConnectionInfo.java 14 Jun 2003 23:34:04 -0000 1.15 --- ConnectionInfo.java 16 Jun 2003 00:01:53 -0000 1.16 *************** *** 32,38 **** * to intelligently request write callbacks. */ ! void setSelectionKeys(SelectionKey in, SelectionKey out) { ! this.writeKey = out; } --- 32,39 ---- * to intelligently request write callbacks. */ ! void setSelectionKey(int operation, SelectionKey out) { ! if (operation == SelectionKey.OP_WRITE) ! this.writeKey = out; } Index: DatagramSink.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/DatagramSink.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** DatagramSink.java 18 Oct 2002 16:59:31 -0000 1.1 --- DatagramSink.java 16 Jun 2003 00:01:53 -0000 1.2 *************** *** 1,7 **** package com.ubermq.kernel; ! import com.ubermq.kernel.IDatagram; ! import com.ubermq.kernel.IOverflowHandler; ! import java.io.IOException; /** --- 1,7 ---- package com.ubermq.kernel; ! import com.ubermq.kernel.*; ! import java.io.*; ! import java.nio.*; /** Index: IConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/IConnectionInfo.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** IConnectionInfo.java 18 Oct 2002 16:59:31 -0000 1.9 --- IConnectionInfo.java 16 Jun 2003 00:01:53 -0000 1.10 *************** *** 47,49 **** --- 47,60 ---- */ public void removeEventListener(ConnectionEventListener l); + + /** + * Accepts incoming conenctions + */ + public interface ConnectionAcceptor + { + /** + * Accepts an incoming conenction. + */ + public void acceptIncomingConnection(IConnectionInfo incoming); + } } Index: IConnectionOverflowHandler.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/IConnectionOverflowHandler.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IConnectionOverflowHandler.java 27 Sep 2002 21:29:03 -0000 1.1 --- IConnectionOverflowHandler.java 16 Jun 2003 00:01:53 -0000 1.2 *************** *** 19,29 **** * @param d the datagram that was being sent when the overflow * occurred. ! * @param connection the connection that was the target of the * <code>output</code> call. * @param messageProcessor the message processor that is calling * this method. */ ! public boolean overflow(IDatagram d, ! IConnectionInfo connection, ! IMessageProcessor messageProcessor); } --- 19,29 ---- * @param d the datagram that was being sent when the overflow * occurred. ! * @param receiver the connection that was the target of the * <code>output</code> call. * @param messageProcessor the message processor that is calling * this method. */ ! public int overflow(IDatagram d, ! IConnectionInfo receiver, ! IMessageProcessor messageProcessor); } Index: IOverflowHandler.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/IOverflowHandler.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** IOverflowHandler.java 30 Jan 2003 14:42:26 -0000 1.5 --- IOverflowHandler.java 16 Jun 2003 00:01:53 -0000 1.6 *************** *** 11,14 **** --- 11,18 ---- extends java.io.Serializable { + public static final int ACTION_RETRY = 1, + ACTION_FAIL = 2, + ACTION_IGNORE = 3; + /** * Handles an overflow situation. *************** *** 18,22 **** * or not. */ ! public boolean overflow(IDatagram d); /** --- 22,26 ---- * or not. */ ! public int overflow(IDatagram d); /** *************** *** 24,28 **** * a datagram. */ ! public boolean overflow(); /** --- 28,32 ---- * a datagram. */ ! public int overflow(); /** Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.17 retrieving revision 1.18 diff -C2 -d -r1.17 -r1.18 *** ReadWriteTransformThread.java 14 Jun 2003 23:34:04 -0000 1.17 --- ReadWriteTransformThread.java 16 Jun 2003 00:01:53 -0000 1.18 *************** *** 15,21 **** private Selector readSelector; private ConnectionList acceptedConnections; ! ! private Map connectionToChannel; ! private static final int CONNECTION_INITIAL_MAP_SIZE = 50; /** --- 15,19 ---- private Selector readSelector; private ConnectionList acceptedConnections; ! private int operation; /** *************** *** 23,35 **** * @param acceptedConnections The ConnectionList used to add connections for service */ ! public ReadWriteTransformThread(Selector readSelector, ! ConnectionList acceptedConnections) { ! super("Channel-Based NIO Thread"); setDaemon(true); ! this.readSelector = readSelector; this.acceptedConnections = acceptedConnections; ! this.connectionToChannel = new HashMap(CONNECTION_INITIAL_MAP_SIZE); } --- 21,34 ---- * @param acceptedConnections The ConnectionList used to add connections for service */ ! public ReadWriteTransformThread(Selector selector, ! ConnectionList acceptedConnections, ! int operation) { ! super("Channel-Based NIO " + (operation == SelectionKey.OP_READ ? "reader" : "writer")); setDaemon(true); ! this.readSelector = selector; this.acceptedConnections = acceptedConnections; ! this.operation = operation; } *************** *** 47,51 **** { registerNewChannels(); ! int keysReady = readSelector.select(); acceptPendingRequests(); --- 46,50 ---- { registerNewChannels(); ! readSelector.select(); acceptPendingRequests(); *************** *** 77,81 **** if (conn.in() instanceof SelectableChannel) { ! doRegister((SelectableChannel)conn.in(), (SelectableChannel)conn.out(), conn); } else --- 76,82 ---- if (conn.in() instanceof SelectableChannel) { ! doRegister(operation == SelectionKey.OP_WRITE ? ! (SelectableChannel)conn.out() : ! (SelectableChannel)conn.in(), conn); } else *************** *** 86,106 **** } ! private void doRegister(SelectableChannel in, ! SelectableChannel out, IConnectionInfo conn) throws IOException { ! in.configureBlocking(false); ! out.configureBlocking(false); ! final SelectionKey inKey = in.register(readSelector, ! (in == out) ? SelectionKey.OP_READ | SelectionKey.OP_WRITE : SelectionKey.OP_READ, ! conn); ! final SelectionKey outKey = (in == out) ? inKey : ! out.register(readSelector, SelectionKey.OP_WRITE, conn); // tell the connection about the new key if (conn instanceof ConnectionInfo) { ! ((ConnectionInfo)conn).setSelectionKeys(inKey, outKey); } --- 87,103 ---- } ! private void doRegister(SelectableChannel channel, IConnectionInfo conn) throws IOException { ! channel.configureBlocking(false); ! final SelectionKey inKey = channel.register(readSelector, ! operation, ! conn); // tell the connection about the new key if (conn instanceof ConnectionInfo) { ! ((ConnectionInfo)conn).setSelectionKey(operation, inKey); } *************** *** 115,119 **** { inKey.cancel(); - outKey.cancel(); } } --- 112,115 ---- *************** *** 133,138 **** ConnectionInfo conn = (ConnectionInfo)key.attachment(); if (key.isReadable() && ! key.isValid()) { ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); --- 129,141 ---- ConnectionInfo conn = (ConnectionInfo)key.attachment(); + if (key.isWritable() && + key.isValid() && + operation == SelectionKey.OP_WRITE) + { + conn.flush(); + } if (key.isReadable() && ! key.isValid() && ! operation == SelectionKey.OP_READ) { ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); *************** *** 140,148 **** // read the data from the channel conn.readFrom(incomingChannel, key); - } - if (key.isWritable() && - key.isValid()) - { - conn.flush(); } } --- 143,146 ---- |
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/kernel/overflow Modified Files: AbstractOverflowHandler.java DropIncoming.java ExponentialBackoff.java HandlerChain.java KeepTrying.java KillReceiver.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: AbstractOverflowHandler.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/AbstractOverflowHandler.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** AbstractOverflowHandler.java 12 Sep 2002 22:23:10 -0000 1.2 --- AbstractOverflowHandler.java 16 Jun 2003 00:01:53 -0000 1.3 *************** *** 12,25 **** implements IOverflowHandler { ! public boolean overflow(IDatagram d) { return doOverflow(); } ! public boolean overflow() { return doOverflow(); } ! protected abstract boolean doOverflow(); } --- 12,25 ---- implements IOverflowHandler { ! public int overflow(IDatagram d) { return doOverflow(); } ! public int overflow() { return doOverflow(); } ! protected abstract int doOverflow(); } Index: DropIncoming.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/DropIncoming.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** DropIncoming.java 22 Aug 2002 15:06:03 -0000 1.2 --- DropIncoming.java 16 Jun 2003 00:01:53 -0000 1.3 *************** *** 12,18 **** public class DropIncoming extends AbstractOverflowHandler { ! protected boolean doOverflow() { ! return false; } --- 12,18 ---- public class DropIncoming extends AbstractOverflowHandler { ! protected int doOverflow() { ! return IOverflowHandler.ACTION_IGNORE; } Index: ExponentialBackoff.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/ExponentialBackoff.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** ExponentialBackoff.java 30 Sep 2002 21:21:59 -0000 1.6 --- ExponentialBackoff.java 16 Jun 2003 00:01:53 -0000 1.7 *************** *** 52,56 **** } ! protected boolean doOverflow() { try { --- 52,56 ---- } ! protected int doOverflow() { try { *************** *** 60,69 **** // if we got interrupted, // let's go ahead and retry ! return true; } if (timeout >= maximumTimeout) ! return !shouldFailIfMaximumReached; ! else return true; } --- 60,69 ---- // if we got interrupted, // let's go ahead and retry ! return IOverflowHandler.ACTION_RETRY; } if (timeout >= maximumTimeout) ! return shouldFailIfMaximumReached ? IOverflowHandler.ACTION_FAIL : IOverflowHandler.ACTION_RETRY; ! else return IOverflowHandler.ACTION_RETRY; } Index: HandlerChain.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/HandlerChain.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** HandlerChain.java 30 Sep 2002 21:21:59 -0000 1.2 --- HandlerChain.java 16 Jun 2003 00:01:53 -0000 1.3 *************** *** 63,73 **** * a datagram. */ ! public boolean overflow() { ! if (!primary.overflow()) { useSecondaryHandler(); } ! return true; } --- 63,75 ---- * a datagram. */ ! public int overflow() { ! int a = primary.overflow(); ! if (a == IOverflowHandler.ACTION_FAIL) { useSecondaryHandler(); + a = IOverflowHandler.ACTION_RETRY; } ! return a; } *************** *** 82,92 **** * this method. */ ! public boolean overflow(IDatagram d, IConnectionInfo connection, IMessageProcessor messageProcessor) { if (primary instanceof IConnectionOverflowHandler) { ! if (!((IConnectionOverflowHandler)primary).overflow(d, connection, messageProcessor)) { useSecondaryHandler(); } ! return true; } else --- 84,96 ---- * this method. */ ! public int overflow(IDatagram d, IConnectionInfo r, IMessageProcessor messageProcessor) { if (primary instanceof IConnectionOverflowHandler) { ! int a = ((IConnectionOverflowHandler)primary).overflow(d, r, messageProcessor); ! if (a == IOverflowHandler.ACTION_FAIL) { useSecondaryHandler(); + a = IOverflowHandler.ACTION_RETRY; } ! return a; } else *************** *** 101,111 **** * or not. */ ! public boolean overflow(IDatagram d) { ! if (!primary.overflow(d)) { useSecondaryHandler(); } ! return true; } --- 105,117 ---- * or not. */ ! public int overflow(IDatagram d) { ! int a = primary.overflow(d); ! if (a == IOverflowHandler.ACTION_FAIL) { useSecondaryHandler(); + a = IOverflowHandler.ACTION_RETRY; } ! return a; } Index: KeepTrying.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/KeepTrying.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** KeepTrying.java 21 Oct 2002 21:11:05 -0000 1.1 --- KeepTrying.java 16 Jun 2003 00:01:53 -0000 1.2 *************** *** 24,28 **** } ! protected boolean doOverflow() { if (nRetries > 0) --- 24,28 ---- } ! protected int doOverflow() { if (nRetries > 0) *************** *** 33,41 **** } catch (InterruptedException e) {} ! return true; } else { ! return false; } } --- 33,41 ---- } catch (InterruptedException e) {} ! return IOverflowHandler.ACTION_RETRY; } else { ! return IOverflowHandler.ACTION_FAIL; } } Index: KillReceiver.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/overflow/KillReceiver.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** KillReceiver.java 23 May 2003 00:09:40 -0000 1.6 --- KillReceiver.java 16 Jun 2003 00:01:53 -0000 1.7 *************** *** 16,23 **** { /** ! * Creates an overflow handler that * closes a connection and deregisters it with the * central control processor. ! */ public KillReceiver() { --- 16,23 ---- { /** ! * Creates an overflow handler that * closes a connection and deregisters it with the * central control processor. ! */ public KillReceiver() { *************** *** 38,48 **** * this method. */ ! public boolean overflow(IDatagram d, ! IConnectionInfo connection, ! IMessageProcessor messageProcessor) { ! Utility.getLogger().info("killing connection endpoint " + connection + " to resolve overflow condition."); ! connection.close(); ! return false; } } --- 38,48 ---- * this method. */ ! public int overflow(IDatagram d, ! IConnectionInfo r, ! IMessageProcessor messageProcessor) { ! Utility.getLogger().info("killing connection endpoint " + r + " to resolve overflow condition."); ! r.close(); ! return IOverflowHandler.ACTION_FAIL; } } |
From: <ji...@us...> - 2003-06-16 00:01:56
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/ssl In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/server/ssl Modified Files: SSLProtocol.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: SSLProtocol.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/ssl/SSLProtocol.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** SSLProtocol.java 31 Jan 2003 23:21:16 -0000 1.5 --- SSLProtocol.java 16 Jun 2003 00:01:53 -0000 1.6 *************** *** 36,41 **** private boolean enabled; private IDatagramFactory factory; - private IMessageProcessor dp; - private ConnectionList[] lists; private PipeEndpoint pipeEndpoint; --- 36,39 ---- *************** *** 71,75 **** public void start(IMessageProcessor dp, ! ConnectionList[] lists) throws IOException { --- 69,73 ---- public void start(IMessageProcessor dp, ! IConnectionInfo.ConnectionAcceptor a) throws IOException { *************** *** 81,87 **** this.s = (SSLServerSocket)SSLServerSocketFactory.getDefault().createServerSocket(port); s.setEnabledCipherSuites(s.getSupportedCipherSuites()); - - this.dp = dp; - this.lists = lists; } catch (IOException x) {throw x;} --- 79,82 ---- |
From: <ji...@us...> - 2003-06-16 00:01:55
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/server/proc Modified Files: DatagramProc.java TTLOverflowHandler.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: DatagramProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/DatagramProc.java,v retrieving revision 1.37 retrieving revision 1.38 diff -C2 -d -r1.37 -r1.38 *** DatagramProc.java 14 Jun 2003 23:34:04 -0000 1.37 --- DatagramProc.java 16 Jun 2003 00:01:52 -0000 1.38 *************** *** 11,17 **** import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; - import com.ubermq.kernel.overflow.*; import java.io.*; import java.lang.reflect.*; import java.util.*; --- 11,17 ---- import com.ubermq.jms.server.routing.impl.*; import com.ubermq.kernel.*; import java.io.*; import java.lang.reflect.*; + import java.nio.*; import java.util.*; *************** *** 119,127 **** { // use mine, a special TTL aware overflow handler. ! overflow = new HandlerChain(new TTLOverflowHandler(INITIAL_TIMEOUT, ! TIMEOUT_FACTOR, ! MAXIMUM_TIMEOUT, ! true), ! new KillReceiver()); } } --- 119,133 ---- { // use mine, a special TTL aware overflow handler. ! ! // In UMQ 2.3, we now favor receivers in most cases and ! // we kill rogue senders that may be over-publishing. ! // it falls on the sender to regulate itself. ! // ! // The Old behavior can be resurrected by specifying the KillReceiver ! // overflow handler in the OVERFLOW_HANDLER_CLASS runtime parameter. ! overflow = new TTLOverflowHandler(INITIAL_TIMEOUT, ! TIMEOUT_FACTOR, ! MAXIMUM_TIMEOUT, ! true); } } Index: TTLOverflowHandler.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/TTLOverflowHandler.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** TTLOverflowHandler.java 30 Sep 2002 21:21:59 -0000 1.4 --- TTLOverflowHandler.java 16 Jun 2003 00:01:52 -0000 1.5 *************** *** 1,13 **** package com.ubermq.jms.server.proc; import com.ubermq.kernel.*; ! ! import com.ubermq.jms.server.datagram.IMessageDatagram; ! import com.ubermq.kernel.overflow.ExponentialBackoff; /** * An extension to the exponential backoff overflow handler * that potentially drops messages if we would wait for space longer than ! * their TTL. */ public class TTLOverflowHandler --- 1,16 ---- package com.ubermq.jms.server.proc; + import com.ubermq.jms.server.datagram.*; import com.ubermq.kernel.*; ! import com.ubermq.kernel.overflow.*; ! import javax.jms.*; /** * An extension to the exponential backoff overflow handler * that potentially drops messages if we would wait for space longer than ! * their TTL.<P> ! * ! * This handler also looks at the delivery mode property and assumes ! * negligible TTL for non persistent messages.<P> */ public class TTLOverflowHandler *************** *** 36,47 **** * to our superclass. */ ! public boolean overflow(IDatagram d) { if (d instanceof IMessageDatagram) { int ttl = ((Integer)((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_TTL)).intValue(); ! if (ttl > 0 && ! this.timeout >= ttl) ! return false; } --- 39,51 ---- * to our superclass. */ ! public int overflow(IDatagram d) { if (d instanceof IMessageDatagram) { int ttl = ((Integer)((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_TTL)).intValue(); ! int dmode = ((Integer)((IMessageDatagram)d).getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE)).intValue(); ! if (dmode == DeliveryMode.NON_PERSISTENT || ! (ttl > 0 && this.timeout >= ttl)) ! return IOverflowHandler.ACTION_IGNORE; } |
From: <ji...@us...> - 2003-06-16 00:01:55
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/client/unicast Modified Files: FailoverConnectionDescriptor.java UnicastClientSession.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: FailoverConnectionDescriptor.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast/FailoverConnectionDescriptor.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** FailoverConnectionDescriptor.java 30 Jan 2003 14:42:20 -0000 1.4 --- FailoverConnectionDescriptor.java 16 Jun 2003 00:01:52 -0000 1.5 *************** *** 141,145 **** * a datagram. */ ! public boolean overflow() { return delegate.overflow(); --- 141,145 ---- * a datagram. */ ! public int overflow() { return delegate.overflow(); *************** *** 163,167 **** * @throws UnsupportedOperationException */ ! public boolean overflow(IDatagram d) { throw new UnsupportedOperationException(); --- 163,167 ---- * @throws UnsupportedOperationException */ ! public int overflow(IDatagram d) { throw new UnsupportedOperationException(); Index: UnicastClientSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/unicast/UnicastClientSession.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** UnicastClientSession.java 10 Jan 2003 22:27:35 -0000 1.2 --- UnicastClientSession.java 16 Jun 2003 00:01:52 -0000 1.3 *************** *** 51,55 **** break; } catch(IOException iox) { ! if (!overflow.overflow()) { throw iox; } else { --- 51,55 ---- break; } catch(IOException iox) { ! if (overflow.overflow() == IOverflowHandler.ACTION_FAIL) { throw iox; } else { |
From: <ji...@us...> - 2003-06-16 00:01:55
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/server/journal/impl Modified Files: SimpleJournal.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: SimpleJournal.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/journal/impl/SimpleJournal.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** SimpleJournal.java 14 Jun 2003 18:14:32 -0000 1.5 --- SimpleJournal.java 16 Jun 2003 00:01:52 -0000 1.6 *************** *** 112,116 **** * our file position, we call the overflow handler. */ ! private synchronized boolean handleOverflow(IDatagram d, int position, IOverflowHandler h) { com.ubermq.Utility.getLogger().fine("handleOverflow enetered."); --- 112,116 ---- * our file position, we call the overflow handler. */ ! private synchronized int handleOverflow(IDatagram d, int position, IOverflowHandler h) { com.ubermq.Utility.getLogger().fine("handleOverflow enetered."); *************** *** 121,128 **** // check if we made any room ! boolean retry = true; int moved = journalBuffer.position() - position; if (moved >= 0 && position > 0) ! retry = h.overflow(d); // ok we now have room. in the process though, we've --- 121,128 ---- // check if we made any room ! int action = IOverflowHandler.ACTION_RETRY; int moved = journalBuffer.position() - position; if (moved >= 0 && position > 0) ! action = h.overflow(d); // ok we now have room. in the process though, we've *************** *** 146,150 **** compact(moved); ! return retry; } --- 146,150 ---- compact(moved); ! return action; } *************** *** 257,261 **** } catch(BufferOverflowException boe) { // try again. ! if (handleOverflow(d, position, h)) output(d, h); } --- 257,261 ---- } catch(BufferOverflowException boe) { // try again. ! if (handleOverflow(d, position, h) == IOverflowHandler.ACTION_RETRY) output(d, h); } |
From: <ji...@us...> - 2003-06-16 00:01:55
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/server Modified Files: MessageServer.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: MessageServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/MessageServer.java,v retrieving revision 1.42 retrieving revision 1.43 diff -C2 -d -r1.42 -r1.43 *** MessageServer.java 14 Jun 2003 23:34:03 -0000 1.42 --- MessageServer.java 16 Jun 2003 00:01:52 -0000 1.43 *************** *** 43,549 **** { private boolean started = false; ! // clustering private ClusterPropagation[] clusterCxn = new ClusterPropagation[CLUSTER_MAX_CXNS]; private int clusterCxnCount = 0; ! // datagram processing & protocls private IMessageProcessor datagramProcessor; private Set protocols = new LinkedHashSet(); [...1013 lines suppressed...] ! { ! // fall through ! } ! catch (Exception e) ! { ! try ! { ! Method getInstance = clazz.getMethod(DATAGRAM_INSTANCE_METHOD, null); ! return (IDatagramFactory)getInstance.invoke(null, null); ! } ! catch (Exception e2) ! { ! // fall through ! com.ubermq.Utility.getLogger().throwing("", "", e2); ! } ! } ! ! return DatagramFactory.getInstance(); } } |
From: <ji...@us...> - 2003-06-16 00:01:55
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/client/impl Modified Files: AbstractClientSession.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: AbstractClientSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/AbstractClientSession.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** AbstractClientSession.java 14 Jun 2003 23:34:03 -0000 1.8 --- AbstractClientSession.java 16 Jun 2003 00:01:52 -0000 1.9 *************** *** 19,54 **** * The process wide i/o thread */ ! private static ReadWriteTransformThread rwtt; /** * The selector to use in conjunction with the I/O thread. */ ! private static Selector readSelector; /** * A list of connections that are enqueued for service by the I/O thread. */ ! private static ConnectionList cxns; ! static { try { readSelector = Selector.open(); } ! catch (java.io.IOException e) { e.printStackTrace(); } ! cxns = new ConnectionList(readSelector); ! rwtt = new ReadWriteTransformThread(readSelector, ! cxns); ! rwtt.start(); } ! public AbstractClientSession() throws java.io.IOException { } ! /** * Adds the connection to the I/O thread's incoming queue. --- 19,65 ---- * The process wide i/o thread */ ! private static ReadWriteTransformThread read, write; /** * The selector to use in conjunction with the I/O thread. */ ! private static Selector readSelector, writeSelector; /** * A list of connections that are enqueued for service by the I/O thread. */ ! private static ConnectionList readCxns, writeCxns; ! static ! { try { readSelector = Selector.open(); + writeSelector = Selector.open(); } ! catch (java.io.IOException e) ! { e.printStackTrace(); } ! readCxns = new ConnectionList(readSelector); ! writeCxns = new ConnectionList(writeSelector); ! ! read = new ReadWriteTransformThread(readSelector, ! readCxns, ! SelectionKey.OP_READ); ! read.start(); ! ! write = new ReadWriteTransformThread(writeSelector, ! writeCxns, ! SelectionKey.OP_WRITE); ! write.start(); } ! public AbstractClientSession() throws java.io.IOException { } ! /** * Adds the connection to the I/O thread's incoming queue. *************** *** 66,70 **** throws IOException { ! cxns.push(conn); } } --- 77,82 ---- throws IOException { ! readCxns.push(conn); ! writeCxns.push(conn); } } |
From: <ji...@us...> - 2003-06-16 00:01:54
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client In directory sc8-pr-cvs1:/tmp/cvs-serv31187/src/com/ubermq/jms/client Modified Files: PipeConnectionFactory.java Log Message: get rid of kill receiver logic, other modifications to honor TTL and non-persistent messages in overflow situations Index: PipeConnectionFactory.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/PipeConnectionFactory.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** PipeConnectionFactory.java 31 Jan 2003 23:21:13 -0000 1.4 --- PipeConnectionFactory.java 16 Jun 2003 00:01:52 -0000 1.5 *************** *** 15,30 **** { private PipeEndpoint s; ! public PipeConnectionFactory(PipeEndpoint s) { ! this.s = s; } ! public javax.jms.Connection createConnection() ! throws JMSException { ! try { ! return new PipeConnection(s); ! } catch(Exception x) {throw new JMSException(x.getMessage());} } } --- 15,36 ---- { private PipeEndpoint s; ! public PipeConnectionFactory(PipeEndpoint s) { ! this.s = s; } ! public javax.jms.Connection createConnection() ! throws JMSException { ! try ! { ! return new PipeConnection(s); ! } ! catch(Exception x) ! { ! x.printStackTrace(); ! throw new JMSException(x.getMessage()); ! } } } |
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv13736/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java ConnectionInfo.java ConnectionList.java ReadWriteTransformThread.java Log Message: move write logic into IO thread Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** AbstractConnectionInfo.java 14 Jun 2003 19:21:28 -0000 1.24 --- AbstractConnectionInfo.java 14 Jun 2003 23:34:04 -0000 1.25 *************** *** 234,240 **** catch(BufferOverflowException boe) { - // ok flush some things maybe? - flush(); - // we just ran out of space. // handle it --- 234,237 ---- *************** *** 245,251 **** } } - - // flush the buffers - flush(); } catch(InterruptedException ie) --- 242,245 ---- *************** *** 259,262 **** --- 253,259 ---- writeMutex.release(); } + + // flush the buffers + requestWrite(); } *************** *** 279,283 **** } ! public void requestFlush() throws IOException { --- 276,280 ---- } ! public void flush() throws IOException { *************** *** 285,289 **** { writeMutex.acquire(); ! flush(); } catch(InterruptedException ie) --- 282,286 ---- { writeMutex.acquire(); ! doFlush(); } catch(InterruptedException ie) *************** *** 303,307 **** * mutex on the write buffer. */ ! private void flush() throws IOException { --- 300,304 ---- * mutex on the write buffer. */ ! private void doFlush() throws IOException { *************** *** 318,321 **** --- 315,322 ---- efficientCompact(writeBuffer); } + + // if we're done, cancel the write request + if (writeBuffer.position() == 0) + cancelWriteRequest(); } catch(java.io.IOException iox) *************** *** 342,345 **** --- 343,356 ---- protected abstract int doWrite(ByteBuffer writeBuffer) throws java.io.IOException; + + protected void requestWrite() + throws IOException + { + doFlush(); + } + + protected void cancelWriteRequest() + { + } //////// READ METHODS Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** ConnectionInfo.java 14 Jun 2003 19:21:28 -0000 1.14 --- ConnectionInfo.java 14 Jun 2003 23:34:04 -0000 1.15 *************** *** 2,5 **** --- 2,6 ---- import com.ubermq.kernel.event.*; + import java.io.*; import java.nio.*; import java.nio.channels.*; *************** *** 13,17 **** private WritableByteChannel out; private ReadableByteChannel in; ! private SelectionKey key; /** --- 14,18 ---- private WritableByteChannel out; private ReadableByteChannel in; ! private SelectionKey writeKey; /** *************** *** 31,37 **** * to intelligently request write callbacks. */ ! void setSelectionKey(SelectionKey key) { ! this.key = key; } --- 32,38 ---- * to intelligently request write callbacks. */ ! void setSelectionKeys(SelectionKey in, SelectionKey out) { ! this.writeKey = out; } *************** *** 60,95 **** } ! public int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException ! { ! int n = out().write(writeBuffer); ! ! // if we have more bytes, register an interest ! // for write callback with the selection key. ! // if not, explicitly cancel the write callback. ! if (key != null) { ! if (writeBuffer.hasRemaining()) ! { ! key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); ! key.selector().wakeup(); ! } ! else ! { ! key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); ! } } ! ! return n; ! } ! /** * Reads from the specified byte channel into the read buffer. This method * is usually called by a dedicated I/O service thread when the channel * has indicated that it has data to be consumed. */ ! void readFrom(ReadableByteChannel channel, SelectionKey key) ! { ByteBuffer readBuffer = null; try --- 61,96 ---- } ! protected void requestWrite() ! throws IOException ! { ! if (writeKey != null) { ! writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); ! writeKey.selector().wakeup(); } ! } ! protected void cancelWriteRequest() ! { ! if (writeKey != null) ! { ! writeKey.interestOps(writeKey.interestOps() & ~SelectionKey.OP_WRITE); ! } ! } ! ! public int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException ! { ! return out().write(writeBuffer); ! } ! ! /** * Reads from the specified byte channel into the read buffer. This method * is usually called by a dedicated I/O service thread when the channel * has indicated that it has data to be consumed. */ ! void readFrom(ReadableByteChannel channel, SelectionKey key) ! { ByteBuffer readBuffer = null; try *************** *** 125,128 **** // process the data processData(); ! } } --- 126,129 ---- // process the data processData(); ! } } Index: ConnectionList.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionList.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** ConnectionList.java 16 Sep 2002 01:43:35 -0000 1.5 --- ConnectionList.java 14 Jun 2003 23:34:04 -0000 1.6 *************** *** 1,45 **** ! package com.ubermq.kernel; ! ! import java.nio.channels.*; ! import java.util.*; ! ! /** ! * A simple selector-aware queue of connections that are waiting to ! * be serviced by an I/O thread. ! */ ! public class ConnectionList { ! private LinkedList list = new LinkedList(); ! private Selector selectorToNotify; ! ! /** ! * Creates a ConnectionList. When a connection is added to the list ! * the given selector's <code>wakeup</code> method is called. ! */ ! public ConnectionList(Selector sel) { ! this.selectorToNotify = sel; ! } ! ! /** ! * Adds a connection to the connection list for ultimate use ! * by an I/O thread that is selecting on the selector specified at ! * creation time. ! */ ! public synchronized void push(IConnectionInfo newlyConnectedInfo) { ! list.add(newlyConnectedInfo); ! selectorToNotify.wakeup(); ! Thread.yield(); ! } ! ! /** ! * Removes a connection from the list and provides it to the caller. ! * This should be called by an I/O thread when its selector is woken up. ! * @return an IConnectionInfo instance representing the new connection. ! */ ! public synchronized Object removeFirst() { ! if(list.size() == 0) ! return null; ! ! return list.removeFirst(); ! } ! } ! --- 1,45 ---- ! package com.ubermq.kernel; ! ! import java.nio.channels.*; ! import java.util.*; ! ! /** ! * A simple selector-aware queue of connections that are waiting to ! * be serviced by an I/O thread. ! */ ! public class ConnectionList { ! private LinkedList list = new LinkedList(); ! private Selector selectorToNotify; ! ! /** ! * Creates a ConnectionList. When a connection is added to the list ! * the given selector's <code>wakeup</code> method is called. ! */ ! public ConnectionList(Selector sel) { ! this.selectorToNotify = sel; ! } ! ! /** ! * Adds a connection to the connection list for ultimate use ! * by an I/O thread that is selecting on the selector specified at ! * creation time. ! */ ! public synchronized void push(IConnectionInfo newlyConnectedInfo) { ! list.add(newlyConnectedInfo); ! selectorToNotify.wakeup(); ! Thread.yield(); ! } ! ! /** ! * Removes a connection from the list and provides it to the caller. ! * This should be called by an I/O thread when its selector is woken up. ! * @return an IConnectionInfo instance representing the new connection. ! */ ! public synchronized Object removeFirst() { ! if(list.size() == 0) ! return null; ! ! return list.removeFirst(); ! } ! } ! Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** ReadWriteTransformThread.java 14 Jun 2003 19:21:28 -0000 1.16 --- ReadWriteTransformThread.java 14 Jun 2003 23:34:04 -0000 1.17 *************** *** 15,140 **** private Selector readSelector; private ConnectionList acceptedConnections; ! private Map connectionToChannel; private static final int CONNECTION_INITIAL_MAP_SIZE = 50; ! /** ! * @param readSelector The selector indicating READ readiness. ! * @param acceptedConnections The ConnectionList used to add connections for service ! */ public ReadWriteTransformThread(Selector readSelector, ! ConnectionList acceptedConnections) { ! super("Channel-Based NIO Thread"); ! setDaemon(true); ! ! this.readSelector = readSelector; ! this.acceptedConnections = acceptedConnections; ! this.connectionToChannel = new HashMap(CONNECTION_INITIAL_MAP_SIZE); } ! /** ! * Executes the I/O thread. This will service connections ! * that are ready for reading as indicated by the selector, and ! * will add new connections as they become available in the ! * connection list. ! */ public void run() { ! while(!isInterrupted()) ! { ! try ! { ! registerNewChannels(); ! int keysReady = readSelector.select(); ! ! acceptPendingRequests(); ! } ! catch(Exception ex) ! { ! com.ubermq.Utility.getLogger().throwing("", "", ex); ! } ! } ! ! // kill all associated connections ! Iterator iter = readSelector.keys().iterator(); ! while (iter.hasNext()) ! { ! SelectionKey key = (SelectionKey)iter.next(); ! key.cancel(); ! ! ConnectionInfo conn = (ConnectionInfo)key.attachment(); ! conn.close(); ! } } ! private void registerNewChannels() throws IOException { ! Object c; ! while(null != (c = acceptedConnections.removeFirst())) ! { ! ConnectionInfo conn = (ConnectionInfo)c; ! if (conn.in() instanceof SelectableChannel) { ! doRegister((SelectableChannel)conn.in(), conn); ! } else { ! // ignore the new connection ! } ! } } ! ! private void doRegister(SelectableChannel channel, IConnectionInfo conn) ! throws IOException { ! channel.configureBlocking(false); ! ! // register the READ operation for the channel, and ! // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ); ! theKey.attach(conn); // tell the connection about the new key if (conn instanceof ConnectionInfo) { ! ((ConnectionInfo)conn).setSelectionKey(theKey); } ! ! // register a close event handler, ! // so we can dispose of the connection selection key ! // when it is closed. ! conn.addEventListener(new ConnectionEventListener() { ! public void connectionEvent(ConnectionEvent e) ! { ! if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED) ! theKey.cancel(); ! } ! }); ! } ! ! private synchronized void acceptPendingRequests() throws IOException ! { ! Set readyKeys = readSelector.selectedKeys(); ! ! for(Iterator i = readyKeys.iterator(); i.hasNext(); ) ! { ! SelectionKey key = (SelectionKey)i.next(); ! i.remove(); ! ! ConnectionInfo conn = (ConnectionInfo)key.attachment(); ! ! ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); ! if (key.isReadable() && ! key.isValid()) ! { ! // read the data from the channel ! conn.readFrom(incomingChannel, key); ! } if (key.isWritable() && key.isValid()) { ! conn.requestFlush(); } ! } ! } ! } --- 15,152 ---- private Selector readSelector; private ConnectionList acceptedConnections; ! private Map connectionToChannel; private static final int CONNECTION_INITIAL_MAP_SIZE = 50; ! /** ! * @param readSelector The selector indicating READ readiness. ! * @param acceptedConnections The ConnectionList used to add connections for service ! */ public ReadWriteTransformThread(Selector readSelector, ! ConnectionList acceptedConnections) { ! super("Channel-Based NIO Thread"); ! setDaemon(true); ! ! this.readSelector = readSelector; ! this.acceptedConnections = acceptedConnections; ! this.connectionToChannel = new HashMap(CONNECTION_INITIAL_MAP_SIZE); } ! /** ! * Executes the I/O thread. This will service connections ! * that are ready for reading as indicated by the selector, and ! * will add new connections as they become available in the ! * connection list. ! */ public void run() { ! while(!isInterrupted()) ! { ! try ! { ! registerNewChannels(); ! int keysReady = readSelector.select(); ! ! acceptPendingRequests(); ! } ! catch(Exception ex) ! { ! com.ubermq.Utility.getLogger().throwing("", "", ex); ! } ! } ! ! // kill all associated connections ! Iterator iter = readSelector.keys().iterator(); ! while (iter.hasNext()) ! { ! SelectionKey key = (SelectionKey)iter.next(); ! key.cancel(); ! ! ConnectionInfo conn = (ConnectionInfo)key.attachment(); ! conn.close(); ! } } ! private void registerNewChannels() throws IOException { ! Object c; ! while(null != (c = acceptedConnections.removeFirst())) ! { ! ConnectionInfo conn = (ConnectionInfo)c; ! if (conn.in() instanceof SelectableChannel) ! { ! doRegister((SelectableChannel)conn.in(), (SelectableChannel)conn.out(), conn); ! } ! else ! { ! // ignore the new connection ! } ! } } ! ! private void doRegister(SelectableChannel in, ! SelectableChannel out, ! IConnectionInfo conn) ! throws IOException { ! in.configureBlocking(false); ! out.configureBlocking(false); ! final SelectionKey inKey = in.register(readSelector, ! (in == out) ? SelectionKey.OP_READ | SelectionKey.OP_WRITE : SelectionKey.OP_READ, ! conn); ! final SelectionKey outKey = (in == out) ? inKey : ! out.register(readSelector, SelectionKey.OP_WRITE, conn); // tell the connection about the new key if (conn instanceof ConnectionInfo) { ! ((ConnectionInfo)conn).setSelectionKeys(inKey, outKey); } ! ! // register a close event handler, ! // so we can dispose of the connection selection key ! // when it is closed. ! conn.addEventListener(new ConnectionEventListener() ! { ! public void connectionEvent(ConnectionEvent e) ! { ! if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED) ! { ! inKey.cancel(); ! outKey.cancel(); ! } ! } ! }); ! } ! ! private synchronized void acceptPendingRequests() ! throws IOException ! { ! Set readyKeys = readSelector.selectedKeys(); ! ! for(Iterator i = readyKeys.iterator(); i.hasNext(); ) ! { ! SelectionKey key = (SelectionKey)i.next(); ! i.remove(); ! ! ConnectionInfo conn = (ConnectionInfo)key.attachment(); ! ! if (key.isReadable() && ! key.isValid()) ! { ! ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); ! ! // read the data from the channel ! conn.readFrom(incomingChannel, key); ! } if (key.isWritable() && key.isValid()) { ! conn.flush(); } ! } ! } ! } |
From: <ji...@us...> - 2003-06-14 23:34:07
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc In directory sc8-pr-cvs1:/tmp/cvs-serv13736/src/com/ubermq/jms/server/proc Modified Files: DatagramProc.java Log Message: move write logic into IO thread Index: DatagramProc.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/proc/DatagramProc.java,v retrieving revision 1.36 retrieving revision 1.37 diff -C2 -d -r1.36 -r1.37 *** DatagramProc.java 30 Jan 2003 22:23:01 -0000 1.36 --- DatagramProc.java 14 Jun 2003 23:34:04 -0000 1.37 *************** *** 28,877 **** private static final long MAXIMUM_TIMEOUT = Long.valueOf(Configurator.getProperty(ServerConfig.DGP_MAXIMUM_TIMEOUT, "5000")).longValue(); private static final int TIMEOUT_FACTOR = Integer.valueOf(Configurator.getProperty(ServerConfig.DGP_BACKOFF_MULTIPLIER, "2")).intValue(); ! // whether the server should send ACK datagrams. private static final boolean shouldSendAck = Boolean.valueOf(Configurator.getProperty(ServerConfig.DGP_SHOULD_SEND_ACKS, "true")).booleanValue(); ! // or, a user-defined overflow handler available on classpath. private static final String OVERFLOW_HANDLER_CLASS = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER, ""); private static final String OVERFLOW_HANDLER_INIT = Configurator.getProperty(ServerConfig.DGP_OVERFLOW_HANDLER_INIT, ""); ! [...1715 lines suppressed...] ! sb.append("\n\tLatency: " + nf.format(accumLatency / ((double)nMessagesIn)) + " ms"); ! sb.append("\n\tThroughput: " + nf.format(1000 * ((double)nMessagesIn) / accumLatency) + " msg/s"); ! sb.append("\n\tUtilization: " + pf.format( accumLatency / (double)(time - startTime) )); ! } ! ! sb.append("\n\n\tUptime: " + formatMillis(uptime)); ! ! return sb.toString(); } ! private String formatMillis(long ms) { ! java.text.NumberFormat tf = java.text.NumberFormat.getNumberInstance(); ! tf.setMinimumIntegerDigits(2); ! tf.setMaximumFractionDigits(3); ! ! return tf.format( ms / 3600 ) + "h " + tf.format( (ms % 3600) / 60 ) + "m"; } ! } |
From: <ji...@us...> - 2003-06-14 23:34:07
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server In directory sc8-pr-cvs1:/tmp/cvs-serv13736/src/com/ubermq/jms/server Modified Files: MessageServer.java Log Message: move write logic into IO thread Index: MessageServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/MessageServer.java,v retrieving revision 1.41 retrieving revision 1.42 diff -C2 -d -r1.41 -r1.42 *** MessageServer.java 1 Feb 2003 16:46:43 -0000 1.41 --- MessageServer.java 14 Jun 2003 23:34:03 -0000 1.42 *************** *** 326,329 **** --- 326,330 ---- IDatagramFactory df, IMessageProcessor dp) + throws IOException { // create a connection info that represents the server |
From: <ji...@us...> - 2003-06-14 23:34:06
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl In directory sc8-pr-cvs1:/tmp/cvs-serv13736/src/com/ubermq/jms/client/impl Modified Files: AbstractClientSession.java Log Message: move write logic into IO thread Index: AbstractClientSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/impl/AbstractClientSession.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** AbstractClientSession.java 24 Jan 2003 15:34:44 -0000 1.7 --- AbstractClientSession.java 14 Jun 2003 23:34:03 -0000 1.8 *************** *** 2,8 **** import com.ubermq.jms.client.*; - import com.ubermq.kernel.*; ! import java.net.*; import java.nio.channels.*; --- 2,7 ---- import com.ubermq.jms.client.*; import com.ubermq.kernel.*; ! import java.io.*; import java.nio.channels.*; *************** *** 56,59 **** --- 55,59 ---- */ public void started(IConnectionInfo c) + throws IOException { addConnection((ConnectionInfo)c); *************** *** 64,67 **** --- 64,68 ---- */ public void addConnection(ConnectionInfo conn) + throws IOException { cxns.push(conn); |
From: <ji...@us...> - 2003-06-14 23:34:06
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client In directory sc8-pr-cvs1:/tmp/cvs-serv13736/src/com/ubermq/jms/client Modified Files: IClientSession.java Log Message: move write logic into IO thread Index: IClientSession.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/IClientSession.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** IClientSession.java 19 Sep 2002 17:30:30 -0000 1.4 --- IClientSession.java 14 Jun 2003 23:34:03 -0000 1.5 *************** *** 2,5 **** --- 2,6 ---- import com.ubermq.kernel.*; + import java.io.*; /** *************** *** 24,28 **** ConnectionDescriptor descriptor, IMessageProcessor proc) ! throws java.io.IOException; /** --- 25,29 ---- ConnectionDescriptor descriptor, IMessageProcessor proc) ! throws java.io.IOException; /** *************** *** 30,33 **** * should commence. */ ! public void started(IConnectionInfo c); } --- 31,35 ---- * should commence. */ ! public void started(IConnectionInfo c) ! throws IOException; } |
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv15283/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java AbstractDatagram.java ConnectionInfo.java ReadWriteTransformThread.java Log Message: actual fix for partial writes due to socket buffer size limits Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.23 retrieving revision 1.24 diff -C2 -d -r1.23 -r1.24 *** AbstractConnectionInfo.java 14 Jun 2003 18:14:32 -0000 1.23 --- AbstractConnectionInfo.java 14 Jun 2003 19:21:28 -0000 1.24 *************** *** 315,319 **** { writeBuffer.flip(); ! int n = doWrite(writeBuffer); efficientCompact(writeBuffer); } --- 315,319 ---- { writeBuffer.flip(); ! doWrite(writeBuffer); efficientCompact(writeBuffer); } Index: AbstractDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractDatagram.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** AbstractDatagram.java 14 Jun 2003 18:14:32 -0000 1.14 --- AbstractDatagram.java 14 Jun 2003 19:21:28 -0000 1.15 *************** *** 75,79 **** /** * Writes a String to a buffer in the default encoding, preceded by a ! * two byte length. */ protected static void writePascalString(String sz, ByteBuffer bb) --- 75,79 ---- /** * Writes a String to a buffer in the default encoding, preceded by a ! * int length. */ protected static void writePascalString(String sz, ByteBuffer bb) Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.13 retrieving revision 1.14 diff -C2 -d -r1.13 -r1.14 *** ConnectionInfo.java 14 Jun 2003 18:14:32 -0000 1.13 --- ConnectionInfo.java 14 Jun 2003 19:21:28 -0000 1.14 *************** *** 13,97 **** private WritableByteChannel out; private ReadableByteChannel in; ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public ConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! super(p,f); } ! public void start() { ! shouldProcess = true; } ! public void stop() { ! shouldProcess = false; } ! public ReadableByteChannel in() {return in;} public WritableByteChannel out() {return out;} ! /** ! * Attaches an input and output channel to this connection. They may be ! * the same object, if the channel implements both readable and writable ! * interfaces. ! */ public void attach(ReadableByteChannel in, WritableByteChannel out) { ! this.in = in; ! this.out = out; } ! public int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException { ! return out().write(writeBuffer); } ! /** ! * Reads from the specified byte channel into the read buffer. This method ! * is usually called by a dedicated I/O service thread when the channel ! * has indicated that it has data to be consumed. ! */ void readFrom(ReadableByteChannel channel, ! SelectionKey key) { ! ByteBuffer readBuffer = null; ! try ! { ! readBuffer = getReadBuffer(); ! int n = channel.read(readBuffer); ! ! // if were are at End Of Stream, we cancel ! // the read selection key. ! if (n == -1) { ! key.cancel(); ! ! // close the channels ! close(); ! } ! } ! catch(java.io.IOException iox) ! { ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! close(); ! } ! catch(InterruptedException ie) { ! // return to caller ! return; ! } ! finally { ! releaseReadBuffer(readBuffer); ! } ! ! // process the data ! processData(); } } --- 13,128 ---- private WritableByteChannel out; private ReadableByteChannel in; ! private SelectionKey key; ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public ConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! super(p,f); } ! ! /** ! * Sets the selection key for this connection. It is used ! * to intelligently request write callbacks. ! */ ! void setSelectionKey(SelectionKey key) ! { ! this.key = key; ! } ! public void start() { ! shouldProcess = true; } ! public void stop() { ! shouldProcess = false; } ! public ReadableByteChannel in() {return in;} public WritableByteChannel out() {return out;} ! /** ! * Attaches an input and output channel to this connection. They may be ! * the same object, if the channel implements both readable and writable ! * interfaces. ! */ public void attach(ReadableByteChannel in, WritableByteChannel out) { ! this.in = in; ! this.out = out; } ! public int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException { ! int n = out().write(writeBuffer); ! ! // if we have more bytes, register an interest ! // for write callback with the selection key. ! // if not, explicitly cancel the write callback. ! if (key != null) ! { ! if (writeBuffer.hasRemaining()) ! { ! key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); ! key.selector().wakeup(); ! } ! else ! { ! key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); ! } ! } ! ! return n; } ! /** ! * Reads from the specified byte channel into the read buffer. This method ! * is usually called by a dedicated I/O service thread when the channel ! * has indicated that it has data to be consumed. ! */ void readFrom(ReadableByteChannel channel, ! SelectionKey key) { ! ByteBuffer readBuffer = null; ! try ! { ! readBuffer = getReadBuffer(); ! int n = channel.read(readBuffer); ! ! // if were are at End Of Stream, we cancel ! // the read selection key. ! if (n == -1) ! { ! key.cancel(); ! ! // close the channels ! close(); ! } ! } ! catch(java.io.IOException iox) ! { ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! close(); ! } ! catch(InterruptedException ie) ! { ! // return to caller ! return; ! } ! finally ! { ! releaseReadBuffer(readBuffer); ! } ! ! // process the data ! processData(); } } Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** ReadWriteTransformThread.java 14 Jun 2003 18:14:32 -0000 1.15 --- ReadWriteTransformThread.java 14 Jun 2003 19:21:28 -0000 1.16 *************** *** 90,95 **** // register the READ operation for the channel, and // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); theKey.attach(conn); // register a close event handler, --- 90,101 ---- // register the READ operation for the channel, and // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ); theKey.attach(conn); + + // tell the connection about the new key + if (conn instanceof ConnectionInfo) + { + ((ConnectionInfo)conn).setSelectionKey(theKey); + } // register a close event handler, *************** *** 123,127 **** conn.readFrom(incomingChannel, key); } ! if (key.isWritable()) { conn.requestFlush(); --- 129,134 ---- conn.readFrom(incomingChannel, key); } ! if (key.isWritable() && ! key.isValid()) { conn.requestFlush(); |
From: <ji...@us...> - 2003-06-14 19:21:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl In directory sc8-pr-cvs1:/tmp/cvs-serv15283/src/com/ubermq/jms/server/datagram/impl Modified Files: DatagramTestCase.java Log Message: actual fix for partial writes due to socket buffer size limits Index: DatagramTestCase.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl/DatagramTestCase.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** DatagramTestCase.java 10 Oct 2002 14:32:34 -0000 1.5 --- DatagramTestCase.java 14 Jun 2003 19:21:28 -0000 1.6 *************** *** 168,173 **** Assert.assertEquals(smd.getStandardProperty(IMessageDatagram.STDPROP_TTL), new Integer(1000)); ! // make sure the body is null and that the props have not been processed. ! Assert.assertNull(smd.getStandardProperty(IMessageDatagram.STDPROP_BODY)); Assert.assertTrue(!smd.hasProcessedProperties()); --- 168,173 ---- Assert.assertEquals(smd.getStandardProperty(IMessageDatagram.STDPROP_TTL), new Integer(1000)); ! // make sure the body is not null and that the props have not been processed. ! Assert.assertNotNull(smd.getStandardProperty(IMessageDatagram.STDPROP_BODY)); Assert.assertTrue(!smd.hasProcessedProperties()); |
From: <ji...@us...> - 2003-06-14 19:21:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test In directory sc8-pr-cvs1:/tmp/cvs-serv15283/src/com/ubermq/jms/client/test Modified Files: RegressionTestCase.java Log Message: actual fix for partial writes due to socket buffer size limits Index: RegressionTestCase.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/client/test/RegressionTestCase.java,v retrieving revision 1.28 retrieving revision 1.29 diff -C2 -d -r1.28 -r1.29 *** RegressionTestCase.java 14 Jun 2003 18:14:32 -0000 1.28 --- RegressionTestCase.java 14 Jun 2003 19:21:28 -0000 1.29 *************** *** 70,74 **** public static final String ASYNC_TOPIC = "AsyncTopic"; ! public void xtestPubSub() throws Exception { --- 70,74 ---- public static final String ASYNC_TOPIC = "AsyncTopic"; ! public void testPubSub() throws Exception { *************** *** 151,155 **** } ! public void xtestMessages() throws Exception { --- 151,155 ---- } ! public void testMessages() throws Exception { *************** *** 257,261 **** ! public void xtestStress() throws Exception { --- 257,261 ---- ! public void testStress() throws Exception { *************** *** 263,267 **** } ! public void xtestAsyncSubscriber() throws Exception { --- 263,267 ---- } ! public void testAsyncSubscriber() throws Exception { *************** *** 300,304 **** } ! public void xtestAsyncClose() throws Exception { --- 300,304 ---- } ! public void testAsyncClose() throws Exception { *************** *** 332,336 **** } ! public void xtestJMS11() throws JMSException { --- 332,336 ---- } ! public void testJMS11() throws JMSException { *************** *** 377,381 **** } ! public void xtestNamespace() throws JMSException { --- 377,381 ---- } ! public void testNamespace() throws JMSException { *************** *** 405,409 **** } ! public void xtestSelectors() throws Exception { --- 405,409 ---- } ! public void testSelectors() throws Exception { *************** *** 441,445 **** * correctly set. */ ! public void xtestPublishSenderIdentifiers() throws JMSException { --- 441,445 ---- * correctly set. */ ! public void testPublishSenderIdentifiers() throws JMSException { *************** *** 472,476 **** * and republishing it works as intended. */ ! public void xtestResending() throws JMSException { --- 472,476 ---- * and republishing it works as intended. */ ! public void testResending() throws JMSException { *************** *** 505,509 **** * Tests pipe connections and makes sure they work w/ themselves, and others. */ ! public void xtestPipes() throws JMSException { --- 505,509 ---- * Tests pipe connections and makes sure they work w/ themselves, and others. */ ! public void testPipes() throws JMSException { *************** *** 529,533 **** * Tests SSL connections and makes sure they work w/ themselves, and others. */ ! public void xtestSSL() throws JMSException { --- 529,533 ---- * Tests SSL connections and makes sure they work w/ themselves, and others. */ ! public void testSSL() throws JMSException { *************** *** 546,550 **** * Tests Clustering. */ ! public void xtestClustering() throws JMSException { --- 546,550 ---- * Tests Clustering. */ ! public void testClustering() throws JMSException { *************** *** 576,580 **** * Tests queue APIs. */ ! public void xtestQueues() throws Exception { --- 576,580 ---- * Tests queue APIs. */ ! public void testQueues() throws Exception { *************** *** 601,605 **** * Tests overflow handling for catatonic message listeners. */ ! public void xtestOverflows() throws Exception { --- 601,605 ---- * Tests overflow handling for catatonic message listeners. */ ! public void testOverflows() throws Exception { *************** *** 683,687 **** * Test event handling. */ ! public void xtestEvents() throws Exception { --- 683,687 ---- * Test event handling. */ ! public void testEvents() throws Exception { *************** *** 726,730 **** } ! public void xtestMulticast() throws Exception { --- 726,730 ---- } ! public void testMulticast() throws Exception { |
From: <ji...@us...> - 2003-06-14 19:21:30
|
Update of /cvsroot/ubermq/jms In directory sc8-pr-cvs1:/tmp/cvs-serv15283 Modified Files: build.xml Log Message: actual fix for partial writes due to socket buffer size limits Index: build.xml =================================================================== RCS file: /cvsroot/ubermq/jms/build.xml,v retrieving revision 1.22 retrieving revision 1.23 diff -C2 -d -r1.22 -r1.23 *** build.xml 14 Jun 2003 18:14:31 -0000 1.22 --- build.xml 14 Jun 2003 19:21:28 -0000 1.23 *************** *** 56,59 **** --- 56,60 ---- <java classpathref="run.path" classname="com.ubermq.jms.server.MessageServer" + dir="${bin}" fork="true"/> </target> |
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv7134/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java AbstractDatagram.java ConnectionInfo.java ReadWriteTransformThread.java Log Message: fix bugs in large message support Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.22 retrieving revision 1.23 diff -C2 -d -r1.22 -r1.23 *** AbstractConnectionInfo.java 10 Jan 2003 22:27:38 -0000 1.22 --- AbstractConnectionInfo.java 14 Jun 2003 18:14:32 -0000 1.23 *************** *** 19,457 **** { private String id; ! /** ! * Indicates whether datagrams read from this connection ! * should be passed to the message processor. When set to false, ! * incoming datagrams are discarded. ! */ protected boolean shouldProcess; ! private boolean open; ! private ByteBuffer readBuffer; private ByteBuffer writeBuffer; ! private Mutex readMutex, writeMutex; ! private List eventHandlers; ! // statics private static long nextId = 2; ! /** ! * The read/write buffer size, as specified by the global property configurator. ! */ protected static final int MAX_READ = Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE, ! "1048576")).intValue(); ! /** ! * The flush threshold, as specified by the global configurator. ! */ protected static final int FLUSH_BUFFER_THRESHOLD = MAX_READ / Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_FLUSH_DIVISOR, "2")).intValue(); ! // the message processor. very important. private final IMessageProcessor proc; ! // the datagram factory. also very important. private final IDatagramFactory factory; ! /** ! * Uses buffer sizes from the global configurator. ! * ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! this(p, ! f, ! MAX_READ, ! MAX_READ); } ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param rbuf the size, in bytes, of the read buffer. ! * @param wbuf the size, in bytes, of the write buffer. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! int rbuf, ! int wbuf) { ! this(p, ! f, ! (rbuf > 0) ? ByteBuffer.allocateDirect(rbuf) : null, ! (wbuf > 0) ? ByteBuffer.allocateDirect(wbuf) : null); } ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param r the actual ByteBuffer used as the read buffer ! * @param w the actual ByteBuffer used as the write buffer ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! ByteBuffer r, ! ByteBuffer w) { ! readBuffer = r; ! if (readBuffer != null) { ! readMutex = new Mutex(); ! } ! ! writeBuffer = w; ! if (writeBuffer != null) { ! writeMutex = new Mutex(); ! } ! ! this.factory = f; ! id = String.valueOf(allocateProcessUniqueId()); ! shouldProcess = true; ! open = true; ! eventHandlers = new LinkedList(); ! proc = p; ! ! // send connected event ! sendEvent(ConnectionEvent.CONNECTION_CONNECTED); } ! public void close() { ! if (open) ! { ! open = false; ! proc.remove(this); ! ! // make sure everything is out of the ! // wbuffer ! assert writeBuffer.position() == 0 : "buffer not empty"; ! ! // closed normally (or abnormally) ! sendEvent(ConnectionEvent.CONNECTION_CLOSED); ! } } ! public boolean isOpen() { ! return open; } ! public void addEventListener(ConnectionEventListener l) { ! eventHandlers.add(l); } ! public void removeEventListener(ConnectionEventListener l) { ! eventHandlers.remove(l); } ! /** ! * Sends an event to all the registered event listeners. ! * ! * @param event the event object ! */ protected void sendEvent(ConnectionEvent event) { ! Utility.getLogger().fine("sending connection event " + event); ! ! Iterator iter = eventHandlers.iterator(); ! while (iter.hasNext()) ! { ! ConnectionEventListener l = (ConnectionEventListener)iter.next(); ! try { ! l.connectionEvent(event); ! } catch(RuntimeException x) { ! // listeners should not throw a runtime exception. ! // move on. ! x.printStackTrace(); ! } ! } } ! /** ! * Sends an event, using the code specified. ! * @param eventCode an event code ! */ void sendEvent(int eventCode) { ! sendEvent(new ConnectionEvent(this, eventCode)); } ! public static synchronized long allocateProcessUniqueId() { ! return ++nextId; } ! /** ! * Output a datagram. If we run out of output buffer space, ! * we call h.overflow() to potentially fix the situation. ! * if overflow() returns true, we will attempt the output operation ! * again and repeat the process using the overflow handler returned ! * from h.getRetryHandler(). <p> ! * ! * In this way, it is possible to create a sequence of overflow handling ! * logic that is a markov process. If the overflow() routine ever returns ! * false, we abort the output operation. ! * ! * @throws IOException if the output fails due to I/O failure, or we are ! * not open. ! */ public void output(IDatagram d, IOverflowHandler h) ! throws IOException { ! if (!open) ! throw new IOException("not open"); ! ! try { ! writeMutex.acquire(); ! ! try { ! // make a sandbox for the output framer ! ByteBuffer output = writeBuffer.slice(); ! factory.outgoing(output, d); ! ! // update the write buffer position ! writeBuffer.position(writeBuffer.position() + output.position()); ! } catch(BufferOverflowException boe) { ! // ok flush some things maybe? ! flush(); ! ! // we just ran out of space. ! // handle it ! if (processOverflow(d, h)) { ! writeMutex.release(); ! output(d, h.getRetryHandler()); ! } ! } ! ! // flush the buffers ! flush(); ! } catch(InterruptedException ie) { ! // abort the current operation ! // we dont' have the mutex, so we can't ! // do anything to the buffer. ! } finally { ! writeMutex.release(); ! } } ! /** ! * Processes an overflow using the specified handler. This determines ! * if the handler can support extra connection information, and gives it ! * if so. ! */ private boolean processOverflow(IDatagram d, ! IOverflowHandler h) { ! if (h instanceof IConnectionOverflowHandler) { ! return ((IConnectionOverflowHandler)h).overflow(d, this, proc); ! } else { ! return h.overflow(d); ! } } ! /** ! * this method should be called when the caller has the ! * mutex on the write buffer. ! */ private void flush() ! throws IOException { ! // write the data out to the channel if there is any data. ! // if there's no data, this method will cause the ! // connection info to remember the write attempt and will ! // subsequently return true from readyToWrite() ! try ! { ! if (writeBuffer.position() > 0) ! { ! writeBuffer.flip(); ! doWrite(writeBuffer); ! efficientCompact(writeBuffer); ! } ! } ! catch(java.io.IOException iox) ! { ! // tell the listeners ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! ! // free resources. this connection is no longer usable. ! close(); ! ! // propagate the exception ! throw iox; ! } } ! /** ! * Writes the contents of the write buffer to its ultimate destination. ! * The position of the buffer will be set to zero and the limit will ! * indicate the number of valid bytes in the buffer on input. When the method ! * returns, the position of buffer will indicate how many bytes were ! * written to the destination. Bytes before the position of the buffer ! * when the method returns may be discarded. ! */ ! protected abstract void doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException; ! //////// READ METHODS ! /** ! * Requests access to the read buffer for an input operation. This method is necessary ! * in order to obtain the mutex protecting this buffer. ! */ protected ByteBuffer getReadBuffer() ! throws InterruptedException { ! readMutex.acquire(); ! return readBuffer; } ! /** ! * Releases the mutex on the read buffer, indicating that the input operation ! * is completed. ! */ protected void releaseReadBuffer(ByteBuffer rb) { ! readMutex.release(); } ! /** ! * Processes data in the read buffer using the datagram factory specified ! * at creation time. ! */ public void processData() { ! try { ! readMutex.acquire(); ! ! // make a view on the data buffer. ! int expecting=0; ! preProcessData(); ! ! // iterate through the view's data until we run out. ! while(true) ! { ! // FRAMING ! // call the datagram factory to figure out ! // how much data we need. ! expecting = factory.frame(readBuffer); ! ! // PROCESS ! // if we have enough data. ! // if we don't, go back to the I/O processor. ! if (readBuffer.remaining() >= expecting) ! { ! // make a new process buffer. ! ByteBuffer process = readBuffer.slice(); ! process.limit(expecting); ! ! // read past the data so the buffer position is right after ! // the datagram we just read. this is an important ! // step to take for subclasses who may want to record ! // where in the buffer datagrams begin & end. ! readBuffer.position(readBuffer.position() + expecting); ! ! // go process it ! if (shouldProcess) { ! // load the datagram. ! IDatagram d = factory.incoming(process); ! ! // now we'll process it according to our RULES. ! proc.process(this, d); ! } ! } else { ! break; ! } ! } ! } ! catch (java.io.IOException ise) { ! // invalid data event. ! sendEvent(ConnectionEvent.CONNECTION_INVALID_PROTOCOL); ! ! // our read buffer is unintelligible. ! // we have no choice but to close the connection. ! close(); ! } ! catch (InterruptedException ie) { ! // NOTHING TO DO HERE. ! } finally { ! // do post processing cleanup ! postProcessData(); ! ! // done ! readMutex.release(); ! } } ! /** ! * prepares for a sequence of read operations ! * position <= limit and remaining() will reflect the number ! * of bytes processed. ! */ protected void preProcessData() { ! readBuffer.flip(); } ! /** ! * performs post processing after interpreting the contents of the read buffer. ! */ protected void postProcessData() { ! // discard the processed data. ! efficientCompact(readBuffer); } ! /** ! * Compacts a buffer. This is optimized here to avoid ! * a call to copyMemory() inside the NIO library that ! * is made even if there are no bytes to copy, i.e. ! * <code>remaining</code> returns 0. ! */ private void efficientCompact(ByteBuffer bb) { ! if (bb.hasRemaining()) ! bb.compact(); ! else ! bb.clear(); } ! public String toString() { ! return getId(); } ! public final String getId() { ! return id; } ! public boolean equals(Object o) { ! try ! { ! return (getId().equals( ((ConnectionInfo)o).getId())); ! } ! catch (ClassCastException e) ! { ! return false; ! } } ! public int hashCode() { ! return getId().hashCode(); } } --- 19,504 ---- { private String id; ! /** ! * Indicates whether datagrams read from this connection ! * should be passed to the message processor. When set to false, ! * incoming datagrams are discarded. ! */ protected boolean shouldProcess; ! private boolean open; ! private ByteBuffer readBuffer; private ByteBuffer writeBuffer; ! private Mutex readMutex, writeMutex; ! private List eventHandlers; ! // statics private static long nextId = 2; ! /** ! * The read/write buffer size, as specified by the global property configurator. ! */ protected static final int MAX_READ = Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE, ! "1048576")).intValue(); ! /** ! * The flush threshold, as specified by the global configurator. ! */ protected static final int FLUSH_BUFFER_THRESHOLD = MAX_READ / Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_FLUSH_DIVISOR, "2")).intValue(); ! // the message processor. very important. private final IMessageProcessor proc; ! // the datagram factory. also very important. private final IDatagramFactory factory; ! /** ! * Uses buffer sizes from the global configurator. ! * ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! this(p, ! f, ! MAX_READ, ! MAX_READ); } ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param rbuf the size, in bytes, of the read buffer. ! * @param wbuf the size, in bytes, of the write buffer. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! int rbuf, ! int wbuf) { ! this(p, ! f, ! (rbuf > 0) ? ByteBuffer.allocateDirect(rbuf) : null, ! (wbuf > 0) ? ByteBuffer.allocateDirect(wbuf) : null); } ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param r the actual ByteBuffer used as the read buffer ! * @param w the actual ByteBuffer used as the write buffer ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! ByteBuffer r, ! ByteBuffer w) { ! readBuffer = r; ! if (readBuffer != null) ! { ! readMutex = new Mutex(); ! } ! ! writeBuffer = w; ! if (writeBuffer != null) ! { ! writeMutex = new Mutex(); ! } ! ! this.factory = f; ! id = String.valueOf(allocateProcessUniqueId()); ! shouldProcess = true; ! open = true; ! eventHandlers = new LinkedList(); ! proc = p; ! ! // send connected event ! sendEvent(ConnectionEvent.CONNECTION_CONNECTED); } ! public void close() { ! if (open) ! { ! open = false; ! proc.remove(this); ! ! // make sure everything is out of the ! // wbuffer ! assert writeBuffer.position() == 0 : "buffer not empty"; ! ! // closed normally (or abnormally) ! sendEvent(ConnectionEvent.CONNECTION_CLOSED); ! } } ! public boolean isOpen() { ! return open; } ! public void addEventListener(ConnectionEventListener l) { ! eventHandlers.add(l); } ! public void removeEventListener(ConnectionEventListener l) { ! eventHandlers.remove(l); } ! /** ! * Sends an event to all the registered event listeners. ! * ! * @param event the event object ! */ protected void sendEvent(ConnectionEvent event) { ! Utility.getLogger().fine("sending connection event " + event); ! ! Iterator iter = eventHandlers.iterator(); ! while (iter.hasNext()) ! { ! ConnectionEventListener l = (ConnectionEventListener)iter.next(); ! try ! { ! l.connectionEvent(event); ! } ! catch(RuntimeException x) ! { ! // listeners should not throw a runtime exception. ! // move on. ! x.printStackTrace(); ! } ! } } ! /** ! * Sends an event, using the code specified. ! * @param eventCode an event code ! */ void sendEvent(int eventCode) { ! sendEvent(new ConnectionEvent(this, eventCode)); } ! public static synchronized long allocateProcessUniqueId() { ! return ++nextId; } ! /** ! * Output a datagram. If we run out of output buffer space, ! * we call h.overflow() to potentially fix the situation. ! * if overflow() returns true, we will attempt the output operation ! * again and repeat the process using the overflow handler returned ! * from h.getRetryHandler(). <p> ! * ! * In this way, it is possible to create a sequence of overflow handling ! * logic that is a markov process. If the overflow() routine ever returns ! * false, we abort the output operation. ! * ! * @throws IOException if the output fails due to I/O failure, or we are ! * not open. ! */ public void output(IDatagram d, IOverflowHandler h) ! throws IOException { ! if (!open) ! throw new IOException("not open"); ! ! try ! { ! writeMutex.acquire(); ! ! try ! { ! // make a sandbox for the output framer ! ByteBuffer output = writeBuffer.slice(); ! factory.outgoing(output, d); ! ! // update the write buffer position ! writeBuffer.position(writeBuffer.position() + output.position()); ! } ! catch(BufferOverflowException boe) ! { ! // ok flush some things maybe? ! flush(); ! ! // we just ran out of space. ! // handle it ! if (processOverflow(d, h)) ! { ! writeMutex.release(); ! output(d, h.getRetryHandler()); ! } ! } ! ! // flush the buffers ! flush(); ! } ! catch(InterruptedException ie) ! { ! // abort the current operation ! // we dont' have the mutex, so we can't ! // do anything to the buffer. ! } ! finally ! { ! writeMutex.release(); ! } } ! /** ! * Processes an overflow using the specified handler. This determines ! * if the handler can support extra connection information, and gives it ! * if so. ! */ private boolean processOverflow(IDatagram d, ! IOverflowHandler h) { ! if (h instanceof IConnectionOverflowHandler) ! { ! return ((IConnectionOverflowHandler)h).overflow(d, this, proc); ! } ! else ! { ! return h.overflow(d); ! } } ! ! public void requestFlush() ! throws IOException ! { ! try ! { ! writeMutex.acquire(); ! flush(); ! } ! catch(InterruptedException ie) ! { ! // abort the current operation ! // we dont' have the mutex, so we can't ! // do anything to the buffer. ! } ! finally ! { ! writeMutex.release(); ! } ! } ! /** ! * this method should be called when the caller has the ! * mutex on the write buffer. ! */ private void flush() ! throws IOException { ! // write the data out to the channel if there is any data. ! // if there's no data, this method will cause the ! // connection info to remember the write attempt and will ! // subsequently return true from readyToWrite() ! try ! { ! if (writeBuffer.position() > 0) ! { ! writeBuffer.flip(); ! int n = doWrite(writeBuffer); ! efficientCompact(writeBuffer); ! } ! } ! catch(java.io.IOException iox) ! { ! // tell the listeners ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! ! // free resources. this connection is no longer usable. ! close(); ! ! // propagate the exception ! throw iox; ! } } ! /** ! * Writes the contents of the write buffer to its ultimate destination. ! * The position of the buffer will be set to zero and the limit will ! * indicate the number of valid bytes in the buffer on input. When the method ! * returns, the position of buffer will indicate how many bytes were ! * written to the destination. Bytes before the position of the buffer ! * when the method returns may be discarded. ! */ ! protected abstract int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException; ! //////// READ METHODS ! /** ! * Requests access to the read buffer for an input operation. This method is necessary ! * in order to obtain the mutex protecting this buffer. ! */ protected ByteBuffer getReadBuffer() ! throws InterruptedException { ! readMutex.acquire(); ! return readBuffer; } ! /** ! * Releases the mutex on the read buffer, indicating that the input operation ! * is completed. ! */ protected void releaseReadBuffer(ByteBuffer rb) { ! readMutex.release(); } ! /** ! * Processes data in the read buffer using the datagram factory specified ! * at creation time. ! */ public void processData() { ! try ! { ! readMutex.acquire(); ! ! // make a view on the data buffer. ! int expecting=0; ! preProcessData(); ! ! // iterate through the view's data until we run out. ! while(true) ! { ! // FRAMING ! // call the datagram factory to figure out ! // how much data we need. ! expecting = factory.frame(readBuffer); ! ! // PROCESS ! // if we have enough data. ! // if we don't, go back to the I/O processor. ! if (readBuffer.remaining() >= expecting) ! { ! // make a new process buffer. ! ByteBuffer process = readBuffer.slice(); ! process.limit(expecting); ! ! // read past the data so the buffer position is right after ! // the datagram we just read. this is an important ! // step to take for subclasses who may want to record ! // where in the buffer datagrams begin & end. ! readBuffer.position(readBuffer.position() + expecting); ! ! // go process it ! if (shouldProcess) ! { ! // load the datagram. ! IDatagram d = factory.incoming(process); ! ! // now we'll process it according to our RULES. ! proc.process(this, d); ! } ! } ! else ! { ! break; ! } ! } ! } ! catch (java.io.IOException ise) ! { ! // invalid data event. ! sendEvent(ConnectionEvent.CONNECTION_INVALID_PROTOCOL); ! ! // our read buffer is unintelligible. ! // we have no choice but to close the connection. ! close(); ! } ! catch (InterruptedException ie) ! { ! // NOTHING TO DO HERE. ! } ! finally ! { ! // do post processing cleanup ! postProcessData(); ! ! // done ! readMutex.release(); ! } } ! /** ! * prepares for a sequence of read operations ! * position <= limit and remaining() will reflect the number ! * of bytes processed. ! */ protected void preProcessData() { ! readBuffer.flip(); } ! /** ! * performs post processing after interpreting the contents of the read buffer. ! */ protected void postProcessData() { ! // discard the processed data. ! efficientCompact(readBuffer); } ! /** ! * Compacts a buffer. This is optimized here to avoid ! * a call to copyMemory() inside the NIO library that ! * is made even if there are no bytes to copy, i.e. ! * <code>remaining</code> returns 0. ! */ private void efficientCompact(ByteBuffer bb) { ! if (bb.hasRemaining()) ! { ! bb.compact(); ! } ! else ! bb.clear(); } ! public String toString() { ! return getId(); } ! public final String getId() { ! return id; } ! public boolean equals(Object o) { ! try ! { ! return (getId().equals( ((ConnectionInfo)o).getId())); ! } ! catch (ClassCastException e) ! { ! return false; ! } } ! public int hashCode() { ! return getId().hashCode(); } } Index: AbstractDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractDatagram.java,v retrieving revision 1.13 retrieving revision 1.14 diff -C2 -d -r1.13 -r1.14 *** AbstractDatagram.java 14 Nov 2002 22:25:30 -0000 1.13 --- AbstractDatagram.java 14 Jun 2003 18:14:32 -0000 1.14 *************** *** 81,93 **** if (sz == null) { ! bb.putShort((short)0); return; } else { int pos = bb.position(); ! bb.position(pos + 2); Utility.encode(sz, bb); ! bb.putShort(pos, (short)((bb.position() - pos - 2) & 0xFFFF)); } } --- 81,93 ---- if (sz == null) { ! bb.putInt(0); return; } else { int pos = bb.position(); ! bb.position(pos + 4); Utility.encode(sz, bb); ! bb.putInt(pos, bb.position() - pos - 4); } } *************** *** 103,107 **** protected static String readPascalString(ByteBuffer bb) { ! short n = bb.getShort(); if (n == 0) { return null; --- 103,107 ---- protected static String readPascalString(ByteBuffer bb) { ! int n = bb.getInt(); if (n == 0) { return null; Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** ConnectionInfo.java 6 Dec 2002 15:22:17 -0000 1.12 --- ConnectionInfo.java 14 Jun 2003 18:14:32 -0000 1.13 *************** *** 50,57 **** } ! public void doWrite(ByteBuffer writeBuffer) throws java.io.IOException { ! out().write(writeBuffer); } --- 50,57 ---- } ! public int doWrite(ByteBuffer writeBuffer) throws java.io.IOException { ! return out().write(writeBuffer); } Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** ReadWriteTransformThread.java 21 Oct 2002 21:11:05 -0000 1.14 --- ReadWriteTransformThread.java 14 Jun 2003 18:14:32 -0000 1.15 *************** *** 90,94 **** // register the READ operation for the channel, and // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ); theKey.attach(conn); --- 90,94 ---- // register the READ operation for the channel, and // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); theKey.attach(conn); *************** *** 123,126 **** --- 123,130 ---- conn.readFrom(incomingChannel, key); } + if (key.isWritable()) + { + conn.requestFlush(); + } } } |