[Ubermq-commits] jms/src/com/ubermq/kernel AbstractConnectionInfo.java,1.15,1.15.2.1
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-10-08 19:50:10
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory usw-pr-cvs1:/tmp/cvs-serv17612/src/com/ubermq/kernel Modified Files: Tag: ubermq-1-0 AbstractConnectionInfo.java Log Message: multicast fixes Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.15 retrieving revision 1.15.2.1 diff -C2 -d -r1.15 -r1.15.2.1 *** AbstractConnectionInfo.java 27 Sep 2002 21:29:03 -0000 1.15 --- AbstractConnectionInfo.java 8 Oct 2002 19:50:06 -0000 1.15.2.1 *************** *** 17,400 **** { protected String id; ! ! /** ! * Indicates whether datagrams read from this connection ! * should be passed to the message processor. When set to false, ! * incoming datagrams are discarded. ! */ protected boolean shouldProcess; ! private boolean open; ! private ByteBuffer readBuffer; private ByteBuffer writeBuffer; ! ! private Mutex readMutex, writeMutex; ! protected IConnectionExceptionHandler handler; ! // statics private static long nextId = 2; ! ! // 1MB r/w buffers by default. that is per connection. protected static final int MAX_READ = Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE, ! "1048576")).intValue(); ! ! // automatically flush when we get to this ratio in the buffer. protected static final int FLUSH_BUFFER_THRESHOLD = MAX_READ / Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_FLUSH_DIVISOR, "2")).intValue(); ! // the message processor. very important. private final IMessageProcessor proc; ! ! // the datagram factory. also very important. private final IDatagramFactory factory; ! ! /** ! * Uses buffer sizes from the global configurator. ! * ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! this(p, ! f, ! MAX_READ, ! MAX_READ); } ! ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param rbuf the size, in bytes, of the read buffer. ! * @param wbuf the size, in bytes, of the write buffer. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! int rbuf, ! int wbuf) ! { ! this(p, ! f, ! (rbuf > 0) ? ByteBuffer.allocateDirect(rbuf) : null, ! (wbuf > 0) ? ByteBuffer.allocateDirect(wbuf) : null); ! } ! ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param r the actual ByteBuffer used as the read buffer ! * @param w the actual ByteBuffer used as the write buffer ! */ ! public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! ByteBuffer r, ! ByteBuffer w) ! { ! readBuffer = r; ! if (readBuffer != null) { ! readMutex = new Mutex(); ! } ! ! writeBuffer = w; ! if (writeBuffer != null) { ! writeMutex = new Mutex(); ! } ! ! this.factory = f; ! ! id = String.valueOf(allocateProcessUniqueId()); ! ! proc = p; ! proc.accept(this); ! ! shouldProcess = true; ! open = true; } ! public void remove() { ! proc.remove(this); } ! public void close() { ! remove(); ! open = false; } ! public boolean isOpen() { ! return open; } ! /** ! * sets a listener object to get called back when exceptions ! * occur on the connection so resources can be cleaned up, etc. ! */ public void setExceptionHandler(IConnectionExceptionHandler h) { ! handler = h; } ! public static synchronized long allocateProcessUniqueId() { ! return ++nextId; } ! /** ! * Output a datagram. If we run out of output buffer space, ! * we call h.overflow() to potentially fix the situation. ! * if overflow() returns true, we will attempt the output operation ! * again and repeat the process using the overflow handler returned ! * from h.getRetryHandler(). <p> ! * ! * In this way, it is possible to create a sequence of overflow handling ! * logic that is a markov process. If the overflow() routine ever returns ! * false, we abort the output operation. ! * ! * @throws IllegalStateException if the output fails due to I/O failure. ! */ public void output(IDatagram d, IOverflowHandler h) { ! try { ! writeMutex.acquire(); ! ! try { ! // make a sandbox for the output framer ! ByteBuffer output = writeBuffer.slice(); ! factory.outgoing(output, d); ! ! // update the write buffer position ! writeBuffer.position(writeBuffer.position() + output.position()); ! } catch(BufferOverflowException boe) { ! // ok flush some things maybe? ! flush(); ! ! // we just ran out of space. ! // handle it ! if (processOverflow(d, h)) { ! writeMutex.release(); ! output(d, h.getRetryHandler()); ! } ! } ! ! // flush the buffers ! flush(); ! } catch(IOException iox) { ! // our I/O should be fail-fast, in other words, ! // we should propagate failures to the caller so that ! // reasonable things can be done. ! throw new IllegalStateException(); ! } catch(InterruptedException ie) { ! // abort the current operation ! // we dont' have the mutex, so we can't ! // do anything to the buffer. ! } finally { ! writeMutex.release(); ! } ! } ! ! /** ! * Processes an overflow using the specified handler. This determines ! * if the handler can support extra connection information, and gives it ! * if so. ! */ ! private boolean processOverflow(IDatagram d, ! IOverflowHandler h) ! { ! if (h instanceof IConnectionOverflowHandler) { ! return ((IConnectionOverflowHandler)h).overflow(d, this, proc); ! } else { ! return h.overflow(d); ! } ! } ! ! /** ! * thie method should be called when the caller has the ! * mutex on the write buffer ! */ ! private void flush() ! throws IOException ! { ! // write the data out to the channel if there is any data. ! // if there's no data, this method will cause the ! // connection info to remember the write attempt and will ! // subsequently return true from readyToWrite() ! try ! { ! if (writeBuffer.position() > 0) ! { ! writeBuffer.flip(); ! doWrite(writeBuffer); ! writeBuffer.compact(); ! } ! } ! catch(java.io.IOException iox) ! { ! // this is an abnormal close. first close our resources, ! // then call the exception handler. ! close(); ! ! if (handler != null) ! handler.onAbnormalClose(); ! ! throw iox; ! } ! } ! ! /** ! * Writes the contents of the write buffer to its ultimate destination. ! * The position of the buffer will be set to zero and the limit will ! * indicate the number of valid bytes in the buffer on input. When the method ! * returns, the position of buffer will indicate how many bytes were ! * written to the destination. Bytes before the position of the buffer ! * when the method returns may be discarded. ! */ ! public abstract void doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException; ! ! //////// READ METHODS ! ! /** ! * Requests access to the read buffer for an input operation. This method is necessary ! * in order to obtain the mutex protecting this buffer. ! */ ! protected ByteBuffer getReadBuffer() ! throws InterruptedException ! { ! readMutex.acquire(); ! return readBuffer; ! } ! ! /** ! * Releases the mutex on the read buffer, indicating that the input operation ! * is completed. ! */ ! protected void releaseReadBuffer(ByteBuffer rb) ! { ! readMutex.release(); ! } ! ! /** ! * Processes data in the read buffer using the datagram factory specified ! * at creation time. ! */ ! public void processData() ! { ! if (!isOpen()) return; ! ! try { ! readMutex.acquire(); ! ! // make a view on the data buffer. ! int expecting=0; ! preProcessData(); ! ! // iterate through the view's data until we run out. ! while(true) ! { ! // FRAMING ! // call the datagram factory to figure out ! // how much data we need. ! ByteBuffer framed = readBuffer.asReadOnlyBuffer(); ! expecting = factory.frame(framed); ! ! // PROCESS ! // if we have enough data. ! // if we don't, go back to the I/O processor. ! if (framed.remaining() >= expecting) ! { ! // make a new process buffer. ! ByteBuffer process = framed.slice(); ! process.limit(expecting); ! ! // read past the data so the buffer position is right after ! // the datagram we just read. this is an important ! // step to take for subclasses who may want to record ! // where in the buffer datagrams begin & end. ! readBuffer.position(readBuffer.position() + expecting); ! ! // go process it ! if (shouldProcess) { ! // load the datagram. ! IDatagram d = factory.incoming(process); ! ! // now we'll process it according to our RULES. ! proc.process(this, d); ! } ! } else { ! break; ! } ! } ! } ! catch (java.io.IOException ise) { ! // our read buffer is unintelligible. ! // we have no choice but to close the connection. ! close(); ! } ! catch (InterruptedException ie) { ! // NOTHING TO DO HERE. ! } finally { ! // do post processing cleanup ! postProcessData(); ! ! // done ! readMutex.release(); ! } ! } ! ! /** ! * prepares for a sequence of read operations ! * position <= limit and remaining() will reflect the number ! * of bytes processed. ! */ ! protected void preProcessData() ! { ! readBuffer.flip(); ! } ! ! /** ! * performs post processing after interpreting the contents of the read buffer. ! */ ! protected void postProcessData() ! { ! // discard the processed data. ! readBuffer.compact(); ! } ! ! public String toString() ! { ! return getId(); ! } ! ! public String getId() ! { ! return id; ! } ! ! public boolean equals(Object o) ! { ! try ! { ! return (getId().equals( ((ConnectionInfo)o).getId())); ! } ! catch (ClassCastException e) ! { ! return false; ! } ! } ! ! public int hashCode() ! { ! return getId().hashCode(); ! } } --- 17,396 ---- { protected String id; ! ! /** ! * Indicates whether datagrams read from this connection ! * should be passed to the message processor. When set to false, ! * incoming datagrams are discarded. ! */ protected boolean shouldProcess; ! private boolean open; ! private ByteBuffer readBuffer; private ByteBuffer writeBuffer; ! ! private Mutex readMutex, writeMutex; ! protected IConnectionExceptionHandler handler; ! // statics private static long nextId = 2; ! ! // 1MB r/w buffers by default. that is per connection. protected static final int MAX_READ = Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE, ! "1048576")).intValue(); ! ! // automatically flush when we get to this ratio in the buffer. protected static final int FLUSH_BUFFER_THRESHOLD = MAX_READ / Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_FLUSH_DIVISOR, "2")).intValue(); ! // the message processor. very important. private final IMessageProcessor proc; ! ! // the datagram factory. also very important. private final IDatagramFactory factory; ! ! /** ! * Uses buffer sizes from the global configurator. ! * ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! this(p, ! f, ! MAX_READ, ! MAX_READ); } ! ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param rbuf the size, in bytes, of the read buffer. ! * @param wbuf the size, in bytes, of the write buffer. ! */ public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! int rbuf, ! int wbuf) ! { ! this(p, ! f, ! (rbuf > 0) ? ByteBuffer.allocateDirect(rbuf) : null, ! (wbuf > 0) ? ByteBuffer.allocateDirect(wbuf) : null); } ! ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! * @param r the actual ByteBuffer used as the read buffer ! * @param w the actual ByteBuffer used as the write buffer ! */ ! public AbstractConnectionInfo(IMessageProcessor p, ! IDatagramFactory f, ! ByteBuffer r, ! ByteBuffer w) ! { ! readBuffer = r; ! readMutex = new Mutex(); ! ! writeBuffer = w; ! writeMutex = new Mutex(); ! ! this.factory = f; ! ! id = String.valueOf(allocateProcessUniqueId()); ! ! proc = p; ! proc.accept(this); ! ! shouldProcess = true; ! open = true; ! } ! public void remove() { ! proc.remove(this); } ! public void close() { ! remove(); ! open = false; } ! public boolean isOpen() { ! return open; } ! /** ! * sets a listener object to get called back when exceptions ! * occur on the connection so resources can be cleaned up, etc. ! */ public void setExceptionHandler(IConnectionExceptionHandler h) { ! handler = h; } ! public static synchronized long allocateProcessUniqueId() { ! return ++nextId; } ! /** ! * Output a datagram. If we run out of output buffer space, ! * we call h.overflow() to potentially fix the situation. ! * if overflow() returns true, we will attempt the output operation ! * again and repeat the process using the overflow handler returned ! * from h.getRetryHandler(). <p> ! * ! * In this way, it is possible to create a sequence of overflow handling ! * logic that is a markov process. If the overflow() routine ever returns ! * false, we abort the output operation. ! * ! * @throws IllegalStateException if the output fails due to I/O failure. ! */ public void output(IDatagram d, IOverflowHandler h) { ! try { ! writeMutex.acquire(); ! ! try { ! // make a sandbox for the output framer ! ByteBuffer output = writeBuffer.slice(); ! factory.outgoing(output, d); ! ! // update the write buffer position ! writeBuffer.position(writeBuffer.position() + output.position()); ! } catch(BufferOverflowException boe) { ! // ok flush some things maybe? ! flush(); ! ! // we just ran out of space. ! // handle it ! if (processOverflow(d, h)) { ! writeMutex.release(); ! output(d, h.getRetryHandler()); ! } ! } ! ! // flush the buffers ! flush(); ! } catch(IOException iox) { ! // our I/O should be fail-fast, in other words, ! // we should propagate failures to the caller so that ! // reasonable things can be done. ! throw new IllegalStateException(); ! } catch(InterruptedException ie) { ! // abort the current operation ! // we dont' have the mutex, so we can't ! // do anything to the buffer. ! } finally { ! writeMutex.release(); ! } ! } ! ! /** ! * Processes an overflow using the specified handler. This determines ! * if the handler can support extra connection information, and gives it ! * if so. ! */ ! private boolean processOverflow(IDatagram d, ! IOverflowHandler h) ! { ! if (h instanceof IConnectionOverflowHandler) { ! return ((IConnectionOverflowHandler)h).overflow(d, this, proc); ! } else { ! return h.overflow(d); ! } ! } ! ! /** ! * thie method should be called when the caller has the ! * mutex on the write buffer ! */ ! private void flush() ! throws IOException ! { ! // write the data out to the channel if there is any data. ! // if there's no data, this method will cause the ! // connection info to remember the write attempt and will ! // subsequently return true from readyToWrite() ! try ! { ! if (writeBuffer.position() > 0) ! { ! writeBuffer.flip(); ! doWrite(writeBuffer); ! writeBuffer.compact(); ! } ! } ! catch(java.io.IOException iox) ! { ! // this is an abnormal close. first close our resources, ! // then call the exception handler. ! close(); ! ! if (handler != null) ! handler.onAbnormalClose(); ! ! throw iox; ! } ! } ! ! /** ! * Writes the contents of the write buffer to its ultimate destination. ! * The position of the buffer will be set to zero and the limit will ! * indicate the number of valid bytes in the buffer on input. When the method ! * returns, the position of buffer will indicate how many bytes were ! * written to the destination. Bytes before the position of the buffer ! * when the method returns may be discarded. ! */ ! public abstract void doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException; ! ! //////// READ METHODS ! ! /** ! * Requests access to the read buffer for an input operation. This method is necessary ! * in order to obtain the mutex protecting this buffer. ! */ ! protected ByteBuffer getReadBuffer() ! throws InterruptedException ! { ! readMutex.acquire(); ! return readBuffer; ! } ! ! /** ! * Releases the mutex on the read buffer, indicating that the input operation ! * is completed. ! */ ! protected void releaseReadBuffer(ByteBuffer rb) ! { ! readMutex.release(); ! } ! ! /** ! * Processes data in the read buffer using the datagram factory specified ! * at creation time. ! */ ! public void processData() ! { ! if (!isOpen()) return; ! ! try { ! readMutex.acquire(); ! ! // make a view on the data buffer. ! int expecting=0; ! preProcessData(); ! ! // iterate through the view's data until we run out. ! while(true) ! { ! // FRAMING ! // call the datagram factory to figure out ! // how much data we need. ! ByteBuffer framed = readBuffer.asReadOnlyBuffer(); ! expecting = factory.frame(framed); ! ! // PROCESS ! // if we have enough data. ! // if we don't, go back to the I/O processor. ! if (framed.remaining() >= expecting) ! { ! // make a new process buffer. ! ByteBuffer process = framed.slice(); ! process.limit(expecting); ! ! // read past the data so the buffer position is right after ! // the datagram we just read. this is an important ! // step to take for subclasses who may want to record ! // where in the buffer datagrams begin & end. ! readBuffer.position(readBuffer.position() + expecting); ! ! // go process it ! if (shouldProcess) { ! // load the datagram. ! IDatagram d = factory.incoming(process); ! ! // now we'll process it according to our RULES. ! proc.process(this, d); ! } ! } else { ! break; ! } ! } ! } ! catch (java.io.IOException ise) { ! // our read buffer is unintelligible. ! // we have no choice but to close the connection. ! close(); ! } ! catch (InterruptedException ie) { ! // NOTHING TO DO HERE. ! } finally { ! // do post processing cleanup ! postProcessData(); ! ! // done ! readMutex.release(); ! } ! } ! ! /** ! * prepares for a sequence of read operations ! * position <= limit and remaining() will reflect the number ! * of bytes processed. ! */ ! protected void preProcessData() ! { ! readBuffer.flip(); ! } ! ! /** ! * performs post processing after interpreting the contents of the read buffer. ! */ ! protected void postProcessData() ! { ! // discard the processed data. ! readBuffer.compact(); ! } ! ! public String toString() ! { ! return getId(); ! } ! ! public String getId() ! { ! return id; ! } ! ! public boolean equals(Object o) ! { ! try ! { ! return (getId().equals( ((ConnectionInfo)o).getId())); ! } ! catch (ClassCastException e) ! { ! return false; ! } ! } ! ! public int hashCode() ! { ! return getId().hashCode(); ! } } |