[Assorted-commits] SF.net SVN: assorted:[1002] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-07 22:36:08
|
Revision: 1002 http://assorted.svn.sourceforge.net/assorted/?rev=1002&view=rev Author: yangzhang Date: 2008-10-07 22:35:53 +0000 (Tue, 07 Oct 2008) Log Message: ----------- fixed buggy writing; added unbounded writes; made the example into an echo server Modified Paths: -------------- java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:17:07 UTC (rev 1001) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:35:53 UTC (rev 1002) @@ -129,8 +129,9 @@ // CharsetDecoder decoder = charset.newDecoder(); // CharBuffer charBuffer = decoder.decode(buf); // System.out.println("read: " + charBuffer.toString()); - System.out.println("read: '" + System.out.println("echoing: '" + Charset.forName("us-ascii").decode(buf) + "'"); + session.write(buf); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -146,12 +147,20 @@ public void handleConnect(Session session) { System.out.println("connected"); try { - session.write(ByteBuffer.wrap("asdf" + session.write(ByteBuffer.wrap("hello" .getBytes())); } catch (Exception ex) { throw new RuntimeException(ex); } } + + @Override + public void handleRead(Session session, + InetSocketAddress src, ByteBuffer buf) { + System.out.println("got back: '" + + Charset.forName("us-ascii").decode( + buf) + "'"); + } }); } }, 1, TimeUnit.SECONDS); Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 22:17:07 UTC (rev 1001) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 22:35:53 UTC (rev 1002) @@ -11,6 +11,7 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.List; @@ -200,9 +201,10 @@ } } } - + /** - * Handle the event where the socket has just established an outgoing connection. + * Handle the event where the socket has just established an outgoing + * connection. */ void handleConnect(SelectionKey key) throws IOException { boolean result = ((SocketChannel) channel).finishConnect(); @@ -225,14 +227,9 @@ // Write until there's not more data ... while (!pendingWrites.isEmpty()) { ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - switch (type) { - case STREAM: - // XXX - break; - case DATAGRAM: - ((DatagramChannel) channel).write(buf); - break; - } + int initPos = buf.position(); + int bytes = ((WritableByteChannel) channel).write(buf); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; @@ -245,30 +242,32 @@ // interested // in writing on this socket. Switch back to waiting // for - // data. + // data (remove OP_WRITE). key.interestOps(SelectionKey.OP_READ); } } + // TODO: use static (fixed) writeBuf (so that it can't change from under our + // feet)? /** * Write data to this stream socket. This does not enqueue anything onto the * internal pendingWrites buffer if the socket's buffer is full, but instead * triggers an assertion failure. * - * @param writeBuf + * @param buf * The data to be written. */ - public void write(ByteBuffer writeBuf) throws Exception { - switch (type) { - case STREAM: - int bytes = ((SocketChannel) channel).write(writeBuf); - // TODO: don't trigger an assertion failure! - assert bytes == writeBuf.limit(); - break; - case DATAGRAM: - throw new Exception("cannot write() on a UDP session!"); - default: - throw new AssertionError("unhandled session type"); + public void write(ByteBuffer buf) throws Exception { + buf.rewind(); + int initPos = buf.position(); + int bytes = ((WritableByteChannel) channel).write(buf); + System.out.println("writing " + bytes); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + if (buf.remaining() > 0) { + pendingWrites.add(buf); + // We're now interested in when the socket is ready for writing. + this.channel.keyFor(selector).interestOps( + SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |