[Ubermq-commits] jms/src/com/ubermq/kernel Chooser.java,NONE,1.1 DatagramSink.java,NONE,1.1 Abstract
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-10-18 16:59:34
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory usw-pr-cvs1:/tmp/cvs-serv28217/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java ConnectionInfo.java IConnectionInfo.java KernelBasedServer.java PipeConnectionInfo.java ReadWriteTransformThread.java Added Files: Chooser.java DatagramSink.java Log Message: massive changes: 1. new failover and URL connection architecture. supports arbitrary URL schemes for registration. 2. connection state events, and a event listener framework for those events. 3. general refactoring and cleanup. --- NEW FILE: Chooser.java --- package com.ubermq.kernel; import java.util.*; /** * Chooses an object from a supplied set of objects. */ public interface Chooser { /** * Resets any state, if applicable. */ public void reset(); /** * Chooses a single object contained in the * given <code>Collection</code> and returns * it. * * @param objects an unordered Collection of objects * @return an Object r, such that <code>objects.contains(r)</code> is true. * @throws IllegalStateException if the chooser can * only operate on a single Collection object, and * the Collection does not match that of the previous * caller. */ public Object choose(Collection objects); } --- NEW FILE: DatagramSink.java --- package com.ubermq.kernel; import com.ubermq.kernel.IDatagram; import com.ubermq.kernel.IOverflowHandler; import java.io.IOException; /** * An I/O oriented sink for datagrams, providing * output capability to abstract endpoints. Overflow * and failure handling is also a requirement for callers.<P> * * If a datagram is to be routed inside the local JVM, * the DatagramEndpoint interface is a more appropriate choice.<P> * * @see com.ubermq.kernel.IDatagramEndpoint */ public interface DatagramSink { /** * Determines whether the sink is available for output, * and whether an <code>output</code> would most likely * succeed. * * @return true if the output dest node is available */ public boolean isOpen(); /** * Outputs a datagram. If the datagram causes an overflow, * the given overflow handler is called to determine whether the * operation should be retried. * * @param d a datagram * @param h the overflow handler to use * @throws IOException if the endpoint is not open, or the * endpoint failed during the output call and is no longer valid. */ public void output(IDatagram d, IOverflowHandler h) throws IOException; } Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.19 retrieving revision 1.20 diff -C2 -d -r1.19 -r1.20 *** AbstractConnectionInfo.java 10 Oct 2002 20:02:59 -0000 1.19 --- AbstractConnectionInfo.java 18 Oct 2002 16:59:30 -0000 1.20 *************** *** 1,10 **** package com.ubermq.kernel; import java.nio.*; import java.util.*; - import com.ubermq.kernel.event.*; - - import EDU.oswego.cs.dl.util.concurrent.Mutex; - import java.io.IOException; /** --- 1,10 ---- package com.ubermq.kernel; + import EDU.oswego.cs.dl.util.concurrent.*; + import com.ubermq.*; + import com.ubermq.kernel.event.*; + import java.io.*; import java.nio.*; import java.util.*; /** *************** *** 131,146 **** } - public void remove() - { - proc.remove(this); - } - public void close() { ! remove(); ! open = false; ! // closed normally ! sendEvent(ConnectionEvent.CONNECTION_CLOSED); } --- 131,148 ---- } public void close() { ! if (open) ! { ! open = false; ! proc.remove(this); ! // make sure everything is out of the ! // wbuffer ! assert writeBuffer.position() == 0 : "buffer not empty"; ! ! // closed normally (or abnormally) ! sendEvent(ConnectionEvent.CONNECTION_CLOSED); ! } } *************** *** 167,170 **** --- 169,174 ---- protected void sendEvent(ConnectionEvent event) { + Utility.getLogger().fine("sending connection event " + event); + Iterator iter = eventHandlers.iterator(); while (iter.hasNext()) *************** *** 176,179 **** --- 180,184 ---- // listeners should not throw a runtime exception. // move on. + x.printStackTrace(); } } *************** *** 205,212 **** * false, we abort the output operation. * ! * @throws IllegalStateException if the output fails due to I/O failure. */ public void output(IDatagram d, IOverflowHandler h) { try { writeMutex.acquire(); --- 210,222 ---- * false, we abort the output operation. * ! * @throws IOException if the output fails due to I/O failure, or we are ! * not open. */ public void output(IDatagram d, IOverflowHandler h) + throws IOException { + if (!open) + throw new IOException("not open"); + try { writeMutex.acquire(); *************** *** 233,241 **** // flush the buffers flush(); - } catch(IOException iox) { - // our I/O should be fail-fast, in other words, - // we should propagate failures to the caller so that - // reasonable things can be done. - throw new IllegalStateException(iox.getMessage()); } catch(InterruptedException ie) { // abort the current operation --- 243,246 ---- *************** *** 263,268 **** /** ! * thie method should be called when the caller has the ! * mutex on the write buffer */ private void flush() --- 268,273 ---- /** ! * this method should be called when the caller has the ! * mutex on the write buffer. */ private void flush() *************** *** 287,291 **** sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! // this is an abnormal situation. close our resources. close(); --- 292,296 ---- sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! // free resources. this connection is no longer usable. close(); *************** *** 334,339 **** public void processData() { - if (!isOpen()) return; - try { readMutex.acquire(); --- 339,342 ---- *************** *** 425,432 **** private void efficientCompact(ByteBuffer bb) { ! if (bb.remaining() == 0) ! bb.clear(); ! else bb.compact(); } --- 428,435 ---- private void efficientCompact(ByteBuffer bb) { ! if (bb.hasRemaining()) bb.compact(); + else + bb.clear(); } Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** ConnectionInfo.java 12 Sep 2002 22:23:10 -0000 1.9 --- ConnectionInfo.java 18 Oct 2002 16:59:31 -0000 1.10 *************** *** 1,8 **** package com.ubermq.kernel; import java.nio.*; import java.nio.channels.*; - import java.nio.charset.Charset; - /** --- 1,7 ---- package com.ubermq.kernel; + import com.ubermq.kernel.event.*; import java.nio.*; import java.nio.channels.*; /** *************** *** 14,86 **** private WritableByteChannel out; private ReadableByteChannel in; ! ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ ! public ConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) ! { ! super(p,f); ! } ! public void start() { ! shouldProcess = true; } ! public void stop() { ! shouldProcess = false; } ! public ReadableByteChannel in() {return in;} public WritableByteChannel out() {return out;} ! ! /** ! * Attaches an input and output channel to this connection. They may be ! * the same object, if the channel implements both readable and writable ! * interfaces. ! */ public void attach(ReadableByteChannel in, WritableByteChannel out) { ! this.in = in; ! this.out = out; } ! ! public void doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException ! { ! out().write(writeBuffer); ! } ! ! /** ! * Reads from the specified byte channel into the read buffer. This method ! * is usually called by a dedicated I/O service thread when the channel ! * has indicated that it has data to be consumed. ! */ ! public void readFrom(ReadableByteChannel channel) { ! ByteBuffer readBuffer = null; ! try ! { ! readBuffer = getReadBuffer(); ! channel.read(readBuffer); ! } ! catch(java.io.IOException iox) ! { ! close(); ! } ! catch(InterruptedException ie) { ! // return to caller ! } ! finally { ! releaseReadBuffer(readBuffer); ! } ! // now we process the new data ! processData(); } } --- 13,98 ---- private WritableByteChannel out; private ReadableByteChannel in; ! ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ ! public ConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) ! { ! super(p,f); ! } ! public void start() { ! shouldProcess = true; } ! public void stop() { ! shouldProcess = false; } ! public ReadableByteChannel in() {return in;} public WritableByteChannel out() {return out;} ! ! /** ! * Attaches an input and output channel to this connection. They may be ! * the same object, if the channel implements both readable and writable ! * interfaces. ! */ public void attach(ReadableByteChannel in, WritableByteChannel out) { ! this.in = in; ! this.out = out; } ! ! public void doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException { ! out().write(writeBuffer); ! } ! /** ! * Reads from the specified byte channel into the read buffer. This method ! * is usually called by a dedicated I/O service thread when the channel ! * has indicated that it has data to be consumed. ! */ ! void readFrom(ReadableByteChannel channel) ! { ! ByteBuffer readBuffer = null; ! try ! { ! readBuffer = getReadBuffer(); ! int n = channel.read(readBuffer); ! ! // if were are at End Of Stream, close() or ! // throw an IOException. if we didn't close ! // the channel ourselves, we assume the server ! // closed us due to an abnormal condition. ! if (n == -1) { ! if (channel.isOpen()) ! throw new ClosedChannelException(); ! else ! close(); ! } ! } ! catch(java.io.IOException iox) ! { ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! close(); ! } ! catch(InterruptedException ie) { ! // return to caller ! return; ! } ! finally { ! releaseReadBuffer(readBuffer); ! } ! ! // process the data ! processData(); } } Index: IConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/IConnectionInfo.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** IConnectionInfo.java 8 Oct 2002 21:24:36 -0000 1.8 --- IConnectionInfo.java 18 Oct 2002 16:59:31 -0000 1.9 *************** *** 1,5 **** package com.ubermq.kernel; ! import com.ubermq.jms.server.routing.OutputDestNode; import com.ubermq.kernel.event.ConnectionEventListener; --- 1,5 ---- package com.ubermq.kernel; ! import com.ubermq.kernel.DatagramSink; import com.ubermq.kernel.event.ConnectionEventListener; *************** *** 9,13 **** */ public interface IConnectionInfo ! extends OutputDestNode { /** --- 9,13 ---- */ public interface IConnectionInfo ! extends DatagramSink { /** *************** *** 17,26 **** */ public void close(); - - /** - * Remove the connection from active processing, without closing it. - * Calling close() implies remove(). - */ - public void remove(); /** --- 17,20 ---- Index: KernelBasedServer.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/KernelBasedServer.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** KernelBasedServer.java 4 Oct 2002 16:04:16 -0000 1.6 --- KernelBasedServer.java 18 Oct 2002 16:59:31 -0000 1.7 *************** *** 4,9 **** import java.net.*; import java.util.*; - import com.ubermq.jms.server.journal.impl.*; - import com.ubermq.jms.server.proc.*; /** --- 4,7 ---- *************** *** 15,66 **** implements Runnable { ! /** ! * Creates a server, using the specified properties file. ! * ! * @param propsFile the filename of the properties file to retrieve. If this is null, ! * the default is used. ! * @see ConfigConstants.DEFAULT_PROPS_FILE ! */ public KernelBasedServer(String propsFile) { ! try ! { ! Configurator.setup(new java.io.FileInputStream(propsFile != null ? propsFile : ConfigConstants.DEFAULT_PROPS_FILE)); ! } ! catch (java.io.IOException e) ! { ! com.ubermq.Utility.getLogger().throwing("", "", e); ! } ! finally ! { ! init(); ! } } ! ! /** ! * Creates a server using the specified Properties object ! * to provide server configuration settings. ! * @param props a Properties object, containing configuration settings. ! */ ! public KernelBasedServer(Properties props) ! { ! Configurator.setup(props); ! init(); ! } ! ! /** ! * Initializes the server. <P> ! * Implementation specific initialization may take place here. This is called ! * after the properties file is loaded into the global Configurator, and ! * before <code>recover</code> or <code>exec</code>. ! */ protected void init() { } ! ! /** ! * Recovers operation from a possible failure. This is called immediately ! * prior to calling <code>exec</code>. ! */ protected void recover() { --- 13,64 ---- implements Runnable { ! /** ! * Creates a server, using the specified properties file. ! * ! * @param propsFile the filename of the properties file to retrieve. If this is null, ! * the default is used. ! * @see ConfigConstants.DEFAULT_PROPS_FILE ! */ public KernelBasedServer(String propsFile) { ! try ! { ! Configurator.setup(new java.io.FileInputStream(propsFile != null ? propsFile : ConfigConstants.DEFAULT_PROPS_FILE)); ! } ! catch (java.io.IOException e) ! { ! com.ubermq.Utility.getLogger().throwing("", "", e); ! } ! finally ! { ! init(); ! } } ! ! /** ! * Creates a server using the specified Properties object ! * to provide server configuration settings. ! * @param props a Properties object, containing configuration settings. ! */ ! public KernelBasedServer(Properties props) ! { ! Configurator.setup(props); ! init(); ! } ! ! /** ! * Initializes the server. <P> ! * Implementation specific initialization may take place here. This is called ! * after the properties file is loaded into the global Configurator, and ! * before <code>recover</code> or <code>exec</code>. ! */ protected void init() { } ! ! /** ! * Recovers operation from a possible failure. This is called immediately ! * prior to calling <code>exec</code>. ! */ protected void recover() { *************** *** 68,102 **** /** ! * Runs the server. ! */ ! public void run() { ! recover(); ! exec(); } ! ! /** ! * Provides a URL at which the server is providing its functionality. ! * The default implementation returns <code>host:port</code>. Subclasses ! * may override in order to provide a protocol specifier prefix, or replace ! * the logic here entirely. ! */ public String getServiceUrl() { ! int port = Integer.valueOf(Configurator.getProperty(ConfigConstants.SERVER_PORT)).intValue(); ! try ! { ! return InetAddress.getLocalHost().getHostName() + ":" + port; ! } ! catch (UnknownHostException e) { ! com.ubermq.Utility.getLogger().throwing("", "", e); ! return "127.0.0.1" + ":" + port; ! } } ! ! /** ! * Begins the operation of the server. Called third - after <code>init</code> ! * and <code>recover</code>. ! */ protected abstract void exec(); } --- 66,100 ---- /** ! * Runs the server. ! */ ! public void run() { ! recover(); ! exec(); } ! ! /** ! * Provides a URL at which the server is providing its functionality. ! * The default implementation returns <code>host:port</code>. Subclasses ! * may override in order to provide a protocol specifier prefix, or replace ! * the logic here entirely. ! */ public String getServiceUrl() { ! int port = Integer.valueOf(Configurator.getProperty(ConfigConstants.SERVER_PORT)).intValue(); ! try ! { ! return InetAddress.getLocalHost().getHostName() + ":" + port; ! } ! catch (UnknownHostException e) { ! com.ubermq.Utility.getLogger().throwing("", "", e); ! return "127.0.0.1" + ":" + port; ! } } ! ! /** ! * Begins the operation of the server. Called third - after <code>init</code> ! * and <code>recover</code>. ! */ protected abstract void exec(); } Index: PipeConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/PipeConnectionInfo.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** PipeConnectionInfo.java 26 Sep 2002 19:52:56 -0000 1.1 --- PipeConnectionInfo.java 18 Oct 2002 16:59:31 -0000 1.2 *************** *** 15,50 **** */ public class PipeConnectionInfo ! extends ConnectionInfo { ! /** ! * Creates a pipe connection record object that ! * reads from the given Pipe.SourceChannel and writes to a ! * Pipe.SinkChannel. These will generally not be channels ! * on the same Pipe object. ! * <P> ! * A typical implementation uses two pipes to form a bidirectional pipe ! * with client on one end and server on the other. ! * <P> ! * @param in the source channel to read from. ! * @param out the sink channel to write to. ! */ ! public PipeConnectionInfo(Pipe.SourceChannel in, ! Pipe.SinkChannel out, ! IDatagramFactory f, ! IMessageProcessor proc) ! { ! super(proc, f); ! attach(in, out); ! } ! ! public void close() ! { ! super.close(); ! try ! { ! in().close(); ! out().close(); ! } ! catch (java.io.IOException e) {/*we are closing anyway*/} ! } } --- 15,50 ---- */ public class PipeConnectionInfo ! extends ConnectionInfo { ! /** ! * Creates a pipe connection record object that ! * reads from the given Pipe.SourceChannel and writes to a ! * Pipe.SinkChannel. These will generally not be channels ! * on the same Pipe object. ! * <P> ! * A typical implementation uses two pipes to form a bidirectional pipe ! * with client on one end and server on the other. ! * <P> ! * @param in the source channel to read from. ! * @param out the sink channel to write to. ! */ ! public PipeConnectionInfo(Pipe.SourceChannel in, ! Pipe.SinkChannel out, ! IDatagramFactory f, ! IMessageProcessor proc) ! { ! super(proc, f); ! attach(in, out); ! } ! ! public void close() ! { ! super.close(); ! try ! { ! out().close(); ! in().close(); ! } ! catch (java.io.IOException e) {/*we are closing anyway*/} ! } } Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** ReadWriteTransformThread.java 10 Oct 2002 14:32:38 -0000 1.12 --- ReadWriteTransformThread.java 18 Oct 2002 16:59:31 -0000 1.13 *************** *** 1,9 **** package com.ubermq.kernel; import java.nio.channels.*; import java.util.*; - import java.io.IOException; - /** * A generic, selector-based I/O thread that works with a <code>ConnectionList</code> --- 1,9 ---- package com.ubermq.kernel; + import com.ubermq.kernel.event.*; + import java.io.*; import java.nio.channels.*; import java.util.*; /** * A generic, selector-based I/O thread that works with a <code>ConnectionList</code> *************** *** 51,56 **** acceptPendingRequests(); } - catch(CancelledKeyException cke) - {} catch(Exception ex) { --- 51,54 ---- *************** *** 89,120 **** { channel.configureBlocking(false); ! SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ); theKey.attach(conn); } private synchronized void acceptPendingRequests() throws IOException { ! try ! { ! Set readyKeys = readSelector.selectedKeys(); ! for(Iterator i = readyKeys.iterator(); i.hasNext(); ) ! { ! SelectionKey key = (SelectionKey)i.next(); ! i.remove(); ! ConnectionInfo conn = (ConnectionInfo)key.attachment(); ! ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); ! if (key.isReadable()) ! { ! // read the data from the channel ! conn.readFrom(incomingChannel); ! } } } - catch(java.nio.channels.CancelledKeyException cke) - {/* OK */} - } --- 87,127 ---- { channel.configureBlocking(false); ! ! // register the READ operation for the channel, and ! // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ); theKey.attach(conn); + + // register a close event handler, + // so we can dispose of the connection selection key + // when it is closed. + conn.addEventListener(new ConnectionEventListener() { + public void connectionEvent(ConnectionEvent e) + { + if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED) + theKey.cancel(); + } + }); } private synchronized void acceptPendingRequests() throws IOException { ! Set readyKeys = readSelector.selectedKeys(); ! for(Iterator i = readyKeys.iterator(); i.hasNext(); ) ! { ! SelectionKey key = (SelectionKey)i.next(); ! i.remove(); ! ConnectionInfo conn = (ConnectionInfo)key.attachment(); ! ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel(); ! if (key.isReadable() && ! key.isValid()) ! { ! // read the data from the channel ! conn.readFrom(incomingChannel); } } } |