[Assorted-commits] SF.net SVN: assorted:[999] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-07 21:32:36
|
Revision: 999 http://assorted.svn.sourceforge.net/assorted/?rev=999&view=rev Author: yangzhang Date: 2008-10-07 21:32:33 +0000 (Tue, 07 Oct 2008) Log Message: ----------- checkpoint. accepting new tcp connections works, but things are messy. tcp writes are still not supported (only udp sends as before). Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 16:44:24 UTC (rev 998) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 21:32:33 UTC (rev 999) @@ -3,8 +3,6 @@ import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.ArrayList; -import java.util.List; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ScheduledFuture; @@ -20,7 +18,6 @@ public class Reactor { private final Selector selector; - private final List<Session> sessions = new ArrayList<Session>(); private boolean doShutdown = false; private final PriorityQueue<ReactorTask> tasks = new PriorityQueue<ReactorTask>(); @@ -69,11 +66,16 @@ if (updated > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { + System.out.println("here"); if (key.isValid()) { if (key.isReadable()) { ((Session) key.attachment()).read(key); } else if (key.isWritable()) { ((Session) key.attachment()).write(key); + } else if (key.isConnectable()) { + ((Session) key.attachment()).read(key); + } else if (key.isAcceptable()) { + ((Session) key.attachment()).read(key); } } } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 16:44:24 UTC (rev 998) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:32:33 UTC (rev 999) @@ -5,6 +5,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -42,7 +43,7 @@ }; } - public void test() throws Exception { + public void testUDP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); int serverPort = 11111, clientPort = 22222; final InetSocketAddress serverSa, clientSa; @@ -103,9 +104,36 @@ } }); } + + public void testTCP() throws Exception { + InetAddress localhost = InetAddress.getLocalHost(); + int serverPort = 11111, clientPort = 22222; + final InetSocketAddress serverSa, clientSa; + serverSa = new InetSocketAddress(localhost, serverPort); + clientSa = new InetSocketAddress(localhost, clientPort); + Reactor r = new Reactor(); + ReactorHandler h = new AbstractHandler() { + @Override + public void handleConnect(Session listenerSession, + SocketChannel src, Session clientSession) { + System.out.println("connection"); + } + + @Override + public void handleRead(Session session, InetSocketAddress src, + ByteBuffer buf) { + System.out.println("got packet"); + } + + }; + r.register(SessionType.STREAM, null, serverSa, h); + r.react(); + } + public static void main(String args[]) throws Exception { - new ReactorTest().test(); + //new ReactorTest().testUDP(); + new ReactorTest().testTCP(); } } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 16:44:24 UTC (rev 998) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:32:33 UTC (rev 999) @@ -4,7 +4,6 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; @@ -30,6 +29,7 @@ private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); private final SessionType type; + private final Selector selector; /** * Construct a new Session object. @@ -50,6 +50,7 @@ this.handler = handler; this.remoteSa = remoteSa; this.type = type; + this.selector = selector; try { switch (type) { @@ -62,13 +63,14 @@ socket.bind(localSa); ch.register(selector, SelectionKey.OP_ACCEPT, this); channel = ch; - } else { + } else if (localSa == null && remoteSa != null) { SocketChannel ch = SocketChannel.open(); ch.configureBlocking(false); - Socket socket = ch.socket(); - socket.connect(remoteSa); ch.register(selector, SelectionKey.OP_CONNECT, this); channel = ch; + } else { + throw new IllegalArgumentException( + "At least one of remoteSa or localSa must be non-null."); } break; case DATAGRAM: { @@ -94,11 +96,12 @@ } } - Session(ReactorHandler handler, SocketChannel channel) { + Session(ReactorHandler handler, SocketChannel channel, Selector selector) { this.handler = handler; this.remoteSa = null; this.channel = channel; this.type = SessionType.STREAM; + this.selector = selector; } /** @@ -111,7 +114,7 @@ * @throws Exception */ void read(SelectionKey key) throws Exception { - while (true) { + loop: while (true) { try { final InetSocketAddress srcSa; @@ -120,13 +123,17 @@ if (channel instanceof ServerSocketChannel) { ServerSocketChannel ch = (ServerSocketChannel) channel; SocketChannel c = ch.accept(); - handler.handleConnect(this, c, new Session(handler, c)); + if (c == null) + break loop; + c.configureBlocking(false); + Session session = new Session(handler, c, selector); + SelectionKey k = c.register(selector, SelectionKey.OP_READ, session); + handler.handleConnect(this, c, session); } else { assert channel instanceof SocketChannel; SocketChannel ch = (SocketChannel) channel; srcSa = (InetSocketAddress) ch.socket() .getRemoteSocketAddress(); - handler.handleRead(this, srcSa, null); int numRead = ch.read(readBuf); if (numRead == -1) { // Remote entity shut the socket down cleanly. Do @@ -134,7 +141,9 @@ key.channel().close(); key.cancel(); } - // TODO also handle numRead == 0 + if (numRead == 0) + break loop; + handler.handleRead(this, srcSa, readBuf); } break; } @@ -158,7 +167,7 @@ } if (srcSa == null) { - break; + break loop; } // after channel wrote to buf, set lim = pos, then pos = 0 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |