[Assorted-commits] SF.net SVN: assorted:[1000] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-07 21:56:31
|
Revision: 1000 http://assorted.svn.sourceforge.net/assorted/?rev=1000&view=rev Author: yangzhang Date: 2008-10-07 21:56:16 +0000 (Tue, 07 Oct 2008) Log Message: ----------- added a (blocking) writer to the test case; removed extraneous empty read (bug existed in both udp and tcp) Modified Paths: -------------- java-reactor/trunk/src/reactor/AbstractHandler.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -13,7 +13,7 @@ public class AbstractHandler implements ReactorHandler { @Override - public void handleConnect(Session listenerSession, SocketChannel src, + public void handleAccept(Session listenerSession, SocketChannel src, Session clientSession) { } @@ -22,4 +22,8 @@ ByteBuffer buf) { } + @Override + public void handleConnect(Session session, SocketChannel channel) { + } + } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -26,16 +26,26 @@ ByteBuffer buf); /** - * Handle a new TCP connection. + * Handle a newly accepted TCP connection. * * @param listenerSession - * The session (socket) of the write. + * The server session. * @param channel * The client channel. * @param clientSession * The client session. */ - public void handleConnect(Session listenerSession, SocketChannel channel, + public void handleAccept(Session listenerSession, SocketChannel channel, Session clientSession); + /** + * Handle a successful outgoing TCP connection (we're the client). + * + * @param session + * The session for this socket. + * @param channel + * The channel. + */ + public void handleConnect(Session session, SocketChannel channel); + } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -4,8 +4,10 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -104,35 +106,69 @@ } }); } - + public void testTCP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); - int serverPort = 11111, clientPort = 22222; - final InetSocketAddress serverSa, clientSa; + int serverPort = 11111; + final InetSocketAddress serverSa; serverSa = new InetSocketAddress(localhost, serverPort); - clientSa = new InetSocketAddress(localhost, clientPort); - Reactor r = new Reactor(); + final Reactor r = new Reactor(); ReactorHandler h = new AbstractHandler() { @Override - public void handleConnect(Session listenerSession, + public void handleAccept(Session listenerSession, SocketChannel src, Session clientSession) { - System.out.println("connection"); + System.out.println("accepted"); } @Override public void handleRead(Session session, InetSocketAddress src, ByteBuffer buf) { - System.out.println("got packet"); + try { + // Short-hand for: + // Charset charset = Charset.forName("us-ascii"); + // CharsetDecoder decoder = charset.newDecoder(); + // CharBuffer charBuffer = decoder.decode(buf); + // System.out.println("read: " + charBuffer.toString()); + System.out.println("read: '" + + Charset.forName("us-ascii").decode(buf) + "'"); + } catch (Exception ex) { + throw new RuntimeException(ex); + } } - + }; r.register(SessionType.STREAM, null, serverSa, h); + spawn(new Runnable() { + public void run() { + Socket s = new Socket(); + try { + Thread.sleep(100); + s.connect(serverSa); + s.getOutputStream().write("asdf".getBytes()); + s.close(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }); + // r.schedule(new Runnable() { + // public void run() { + // r.register(SessionType.STREAM, serverSa, null, + // new AbstractHandler() { + // @Override + // public void handleAccept(Session listenerSession, + // SocketChannel src, Session clientSession) { + // System.out.println("connected"); + // } + // }); + // } + // }, 1, TimeUnit.SECONDS); r.react(); } public static void main(String args[]) throws Exception { - //new ReactorTest().testUDP(); + // new ReactorTest().testUDP(); new ReactorTest().testTCP(); } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -127,8 +127,8 @@ break loop; c.configureBlocking(false); Session session = new Session(handler, c, selector); - SelectionKey k = c.register(selector, SelectionKey.OP_READ, session); - handler.handleConnect(this, c, session); + c.register(selector, SelectionKey.OP_READ, session); + handler.handleAccept(this, c, session); } else { assert channel instanceof SocketChannel; SocketChannel ch = (SocketChannel) channel; @@ -140,10 +140,17 @@ // the same from our end and cancel the channel. key.channel().close(); key.cancel(); + break loop; } if (numRead == 0) break loop; + + // after channel wrote to buf, set lim = pos, then pos = 0 + readBuf.flip(); + // callback handler.handleRead(this, srcSa, readBuf); + // recycle buffer + readBuf.clear(); } break; } @@ -161,6 +168,7 @@ // channel. key.channel().close(); key.cancel(); + break loop; } // TODO also handle numRead == 0 srcSa = remoteSa; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |