[Ubermq-commits] jms/src/com/ubermq/kernel AbstractConnectionInfo.java,1.23,1.24 AbstractDatagram.ja
Brought to you by:
jimmyp
Update of /cvsroot/ubermq/jms/src/com/ubermq/kernel In directory sc8-pr-cvs1:/tmp/cvs-serv15283/src/com/ubermq/kernel Modified Files: AbstractConnectionInfo.java AbstractDatagram.java ConnectionInfo.java ReadWriteTransformThread.java Log Message: actual fix for partial writes due to socket buffer size limits Index: AbstractConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractConnectionInfo.java,v retrieving revision 1.23 retrieving revision 1.24 diff -C2 -d -r1.23 -r1.24 *** AbstractConnectionInfo.java 14 Jun 2003 18:14:32 -0000 1.23 --- AbstractConnectionInfo.java 14 Jun 2003 19:21:28 -0000 1.24 *************** *** 315,319 **** { writeBuffer.flip(); ! int n = doWrite(writeBuffer); efficientCompact(writeBuffer); } --- 315,319 ---- { writeBuffer.flip(); ! doWrite(writeBuffer); efficientCompact(writeBuffer); } Index: AbstractDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/AbstractDatagram.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** AbstractDatagram.java 14 Jun 2003 18:14:32 -0000 1.14 --- AbstractDatagram.java 14 Jun 2003 19:21:28 -0000 1.15 *************** *** 75,79 **** /** * Writes a String to a buffer in the default encoding, preceded by a ! * two byte length. */ protected static void writePascalString(String sz, ByteBuffer bb) --- 75,79 ---- /** * Writes a String to a buffer in the default encoding, preceded by a ! * int length. */ protected static void writePascalString(String sz, ByteBuffer bb) Index: ConnectionInfo.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ConnectionInfo.java,v retrieving revision 1.13 retrieving revision 1.14 diff -C2 -d -r1.13 -r1.14 *** ConnectionInfo.java 14 Jun 2003 18:14:32 -0000 1.13 --- ConnectionInfo.java 14 Jun 2003 19:21:28 -0000 1.14 *************** *** 13,97 **** private WritableByteChannel out; private ReadableByteChannel in; ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public ConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! super(p,f); } ! public void start() { ! shouldProcess = true; } ! public void stop() { ! shouldProcess = false; } ! public ReadableByteChannel in() {return in;} public WritableByteChannel out() {return out;} ! /** ! * Attaches an input and output channel to this connection. They may be ! * the same object, if the channel implements both readable and writable ! * interfaces. ! */ public void attach(ReadableByteChannel in, WritableByteChannel out) { ! this.in = in; ! this.out = out; } ! public int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException { ! return out().write(writeBuffer); } ! /** ! * Reads from the specified byte channel into the read buffer. This method ! * is usually called by a dedicated I/O service thread when the channel ! * has indicated that it has data to be consumed. ! */ void readFrom(ReadableByteChannel channel, ! SelectionKey key) { ! ByteBuffer readBuffer = null; ! try ! { ! readBuffer = getReadBuffer(); ! int n = channel.read(readBuffer); ! ! // if were are at End Of Stream, we cancel ! // the read selection key. ! if (n == -1) { ! key.cancel(); ! ! // close the channels ! close(); ! } ! } ! catch(java.io.IOException iox) ! { ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! close(); ! } ! catch(InterruptedException ie) { ! // return to caller ! return; ! } ! finally { ! releaseReadBuffer(readBuffer); ! } ! ! // process the data ! processData(); } } --- 13,128 ---- private WritableByteChannel out; private ReadableByteChannel in; ! private SelectionKey key; ! /** ! * @param p the MessageProcessor that will handle datagrams as they are read in ! * from the read buffer. ! * @param f the DatagramFactory that will process raw byte streams and perform ! * framing and interpretation. ! */ public ConnectionInfo(IMessageProcessor p, ! IDatagramFactory f) { ! super(p,f); } ! ! /** ! * Sets the selection key for this connection. It is used ! * to intelligently request write callbacks. ! */ ! void setSelectionKey(SelectionKey key) ! { ! this.key = key; ! } ! public void start() { ! shouldProcess = true; } ! public void stop() { ! shouldProcess = false; } ! public ReadableByteChannel in() {return in;} public WritableByteChannel out() {return out;} ! /** ! * Attaches an input and output channel to this connection. They may be ! * the same object, if the channel implements both readable and writable ! * interfaces. ! */ public void attach(ReadableByteChannel in, WritableByteChannel out) { ! this.in = in; ! this.out = out; } ! public int doWrite(ByteBuffer writeBuffer) ! throws java.io.IOException { ! int n = out().write(writeBuffer); ! ! // if we have more bytes, register an interest ! // for write callback with the selection key. ! // if not, explicitly cancel the write callback. ! if (key != null) ! { ! if (writeBuffer.hasRemaining()) ! { ! key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); ! key.selector().wakeup(); ! } ! else ! { ! key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); ! } ! } ! ! return n; } ! /** ! * Reads from the specified byte channel into the read buffer. This method ! * is usually called by a dedicated I/O service thread when the channel ! * has indicated that it has data to be consumed. ! */ void readFrom(ReadableByteChannel channel, ! SelectionKey key) { ! ByteBuffer readBuffer = null; ! try ! { ! readBuffer = getReadBuffer(); ! int n = channel.read(readBuffer); ! ! // if were are at End Of Stream, we cancel ! // the read selection key. ! if (n == -1) ! { ! key.cancel(); ! ! // close the channels ! close(); ! } ! } ! catch(java.io.IOException iox) ! { ! sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); ! close(); ! } ! catch(InterruptedException ie) ! { ! // return to caller ! return; ! } ! finally ! { ! releaseReadBuffer(readBuffer); ! } ! ! // process the data ! processData(); } } Index: ReadWriteTransformThread.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/kernel/ReadWriteTransformThread.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** ReadWriteTransformThread.java 14 Jun 2003 18:14:32 -0000 1.15 --- ReadWriteTransformThread.java 14 Jun 2003 19:21:28 -0000 1.16 *************** *** 90,95 **** // register the READ operation for the channel, and // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); theKey.attach(conn); // register a close event handler, --- 90,101 ---- // register the READ operation for the channel, and // attach the IConnectionInfo to the Key. ! final SelectionKey theKey = channel.register(readSelector, SelectionKey.OP_READ); theKey.attach(conn); + + // tell the connection about the new key + if (conn instanceof ConnectionInfo) + { + ((ConnectionInfo)conn).setSelectionKey(theKey); + } // register a close event handler, *************** *** 123,127 **** conn.readFrom(incomingChannel, key); } ! if (key.isWritable()) { conn.requestFlush(); --- 129,134 ---- conn.readFrom(incomingChannel, key); } ! if (key.isWritable() && ! key.isValid()) { conn.requestFlush(); |