[Assorted-commits] SF.net SVN: assorted:[997] java-reactor/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-07 08:40:24
|
Revision: 997 http://assorted.svn.sourceforge.net/assorted/?rev=997&view=rev Author: yangzhang Date: 2008-10-07 08:37:43 +0000 (Tue, 07 Oct 2008) Log Message: ----------- making space/preparing for introduction of stream sockets Modified Paths: -------------- java-reactor/trunk/README java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Added Paths: ----------- java-reactor/trunk/src/reactor/SessionType.java Modified: java-reactor/trunk/README =================================================================== --- java-reactor/trunk/README 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/README 2008-10-07 08:37:43 UTC (rev 997) @@ -11,8 +11,23 @@ scheduled executor tasks - these are run in-line (using timeouts to select). Aside from its simplicity, efficiency, and support for scheduled tasks, it also -has no dependencies except for Java 6. Currently only UDP is supported. +has no dependencies except for Java 6. Both TCP and UDP are supported. +Setup +----- + +Requirements: + +- [Java] 6 + +[Java]: http://java.sun.com/ + +Suggested: + +- [SimpleBuild] + +[SimpleBuild]: http://assorted.sf.net/simple-build/ + Usage ----- @@ -31,6 +46,17 @@ [SRON]: https://moo.cmcl.cs.cmu.edu/trac/scaleron [slf4j]: http://www.slf4j.org/ +Changes +------- + +version 0.2, 2008-10-08 + +- added TCP support + +version 0.1, 2008-03-03 + +- initial release; UDP only + Links ----- Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 08:37:43 UTC (rev 997) @@ -13,9 +13,9 @@ /** * A simple select-based reactor for event-based asynchronous IO programming. It * supports scheduling of events. - * + * * @author yang - * + * */ public class Reactor { @@ -29,28 +29,29 @@ } /** - * Register a new session (i.e. socket). Currently, remoteSa is ignored. It - * was intended for creating restricted datagram sockets, which have better - * performance but occupy a file descriptor each. - * + * Register a new session (i.e. socket). + * * @param remoteSa - * Ignored. + * If non-null, create a restricted datagram socket, which has + * better performance but occupies a file descriptor. * @param localSa - * The local address and port to send/receive messages on. + * The local address and port to send/receive messages on, or + * null for auto port selection. * @param handler * The handler for events on this socket. * @return */ - public Session register(InetSocketAddress remoteSa, + public Session register(SessionType type, InetSocketAddress remoteSa, InetSocketAddress localSa, ReactorHandler handler) { - Session session = new Session(remoteSa, localSa, handler, selector); + Session session = new Session(type, remoteSa, localSa, handler, + selector); sessions.add(session); return session; } /** * The main reactor loop. This runs until the shutdown method is called. - * + * * @throws Exception */ public void react() throws Exception { @@ -93,7 +94,7 @@ /** * Schedule a task to occur after a delay. - * + * * @param r * The Runnable to be executed. * @param delay @@ -111,7 +112,7 @@ /** * Schedule a task with a fixed delay. - * + * * @param r * The Runnable to be repeatedly executed. * @param initialDelay @@ -136,10 +137,10 @@ /** * Cancel a task. This is used as a callback from ReactorTask. Removes a * task from the tasks queue. - * + * * TODO: remove the return value and assert that the return value of the * remove call is the task itself? - * + * * @param task * The task to be canceled. * @return The result of the queue removal (either the task itself, or null Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 08:37:43 UTC (rev 997) @@ -11,99 +11,101 @@ /** * A simple test for the reactor. + * * @author yang - * + * */ public class ReactorTest { - ExecutorService e; + ExecutorService e; - public ReactorTest() { - e = Executors.newCachedThreadPool(); - } + public ReactorTest() { + e = Executors.newCachedThreadPool(); + } - public void spawn(final Runnable r) { - e.submit(new Runnable() { - public void run() { - try { - r.run(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } + public void spawn(final Runnable r) { + e.submit(new Runnable() { + public void run() { + try { + r.run(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } - private Runnable makeRunnable(final int i) { - return new Runnable() { - public void run() { - System.out.println(i); - } - }; - } + private Runnable makeRunnable(final int i) { + return new Runnable() { + public void run() { + System.out.println(i); + } + }; + } - public void test() throws Exception { - InetAddress localhost = InetAddress.getLocalHost(); - int serverPort = 11111, clientPort = 22222; - final InetSocketAddress serverSa, clientSa; - serverSa = new InetSocketAddress(localhost, serverPort); - clientSa = new InetSocketAddress(localhost, clientPort); + public void test() throws Exception { + InetAddress localhost = InetAddress.getLocalHost(); + int serverPort = 11111, clientPort = 22222; + final InetSocketAddress serverSa, clientSa; + serverSa = new InetSocketAddress(localhost, serverPort); + clientSa = new InetSocketAddress(localhost, clientPort); - final ReactorHandler handler = new ReactorHandler() { - public void handle(Session service, InetSocketAddress src, - ByteBuffer buf) { - System.out.println("received: " + buf); - } - }; + final ReactorHandler handler = new ReactorHandler() { + public void handle(Session service, InetSocketAddress src, + ByteBuffer buf) { + System.out.println("received: " + buf); + } + }; - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - r.register(null, serverSa, handler); + spawn(new Runnable() { + public void run() { + try { + Reactor r = new Reactor(); + r.register(SessionType.DATAGRAM, null, serverSa, handler); - for (int i = 0; i < 10; i++) - r.schedule(makeRunnable(i), 200 * i, - TimeUnit.MILLISECONDS); + for (int i = 0; i < 10; i++) + r.schedule(makeRunnable(i), 200 * i, + TimeUnit.MILLISECONDS); - Thread.sleep(1000); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); + Thread.sleep(1000); + r.react(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - ByteBuffer writeBuf = ByteBuffer.allocate(5); - Session s = r.register(null, clientSa, handler); - Thread.sleep(2000); - s.send(writeBuf, clientSa); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); + spawn(new Runnable() { + public void run() { + try { + Reactor r = new Reactor(); + ByteBuffer writeBuf = ByteBuffer.allocate(5); + Session s = r.register(SessionType.DATAGRAM, null, + clientSa, handler); + Thread.sleep(2000); + s.send(writeBuf, clientSa); + r.react(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); - spawn(new Runnable() { - public void run() { - try { - byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; - Thread.sleep(3000); - new DatagramSocket().send(new DatagramPacket(writeBuf, - writeBuf.length, serverSa)); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); - } + spawn(new Runnable() { + public void run() { + try { + byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; + Thread.sleep(3000); + new DatagramSocket().send(new DatagramPacket(writeBuf, + writeBuf.length, serverSa)); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); + } - public static void main(String args[]) throws Exception { - new ReactorTest().test(); - } + public static void main(String args[]) throws Exception { + new ReactorTest().test(); + } } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 08:37:43 UTC (rev 997) @@ -5,6 +5,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; @@ -13,37 +14,41 @@ /** * Represents a "session," which is probably a bad name for what is basically a * wrapper around a datagram socket. - * + * * @author yang - * + * */ public class Session { - private final DatagramChannel channel; + private final SelectableChannel channel; private final ReactorHandler handler; private final InetSocketAddress remoteSa; private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); + private final SessionType type; /** * Construct a new Session object. - * + * * @param remoteSa - * Ignored. + * If non-null, create a restricted datagram socket, which has + * better performance but occupies a file descriptor. * @param localSa - * The local socket address on which to send/receive packets. + * The local address and port to send/receive messages on, or + * null for auto port selection. * @param handler * The handler for events on this socket. * @param selector * The selector that is used in the current reactor. */ - Session(InetSocketAddress remoteSa, InetSocketAddress localSa, - ReactorHandler handler, Selector selector) { + Session(SessionType type, InetSocketAddress remoteSa, + InetSocketAddress localSa, ReactorHandler handler, Selector selector) { this.handler = handler; this.remoteSa = remoteSa; + this.type = type; try { - channel = DatagramChannel.open(); + DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); DatagramSocket socket = channel.socket(); socket.setReuseAddress(true); @@ -53,6 +58,8 @@ channel.connect(remoteSa); channel.register(selector, SelectionKey.OP_READ, this); + + this.channel = channel; } catch (Exception ex) { throw new RuntimeException(ex); } @@ -61,7 +68,7 @@ /** * This is called by the reactor when there is a message to be received. * Reading messages is the priority, so this is done before anything else. - * + * * @param key * The key into the selector's socket set representing our * socket. @@ -70,24 +77,39 @@ void read(SelectionKey key) throws Exception { while (true) { try { - InetSocketAddress srcSa; + final InetSocketAddress srcSa; - if (remoteSa == null) { - srcSa = (InetSocketAddress) channel.receive(readBuf); - } else { - int numRead = channel.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down - // cleanly. - // Do - // the same from our end and cancel the - // channel. - key.channel().close(); - key.cancel(); + switch (type) { + default: + srcSa = null; + break; + case STREAM: { + // XXX + // ServerSocketChannel ch = (ServerSocketChannel) channel; + srcSa = null; + break; + } + case DATAGRAM: { + DatagramChannel ch = (DatagramChannel) channel; + if (remoteSa == null) { + srcSa = (InetSocketAddress) ch.receive(readBuf); + } else { + int numRead = ch.read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down + // cleanly. + // Do + // the same from our end and cancel the + // channel. + key.channel().close(); + key.cancel(); + } + // TODO also handle numRead == 0 + srcSa = remoteSa; } - // TODO also handle numRead == 0 - srcSa = remoteSa; + break; } + } if (srcSa == null) { break; @@ -112,7 +134,7 @@ * Handle the event where the socket is ready to be written. This is * currently not used since the send() method blindly writes immediately to * the socket - which is bad. - * + * * @param key * The key into the selector's socket set that represents our * socket. @@ -122,7 +144,14 @@ // Write until there's not more data ... while (!pendingWrites.isEmpty()) { ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - channel.write(buf); + switch (type) { + case STREAM: + // XXX + break; + case DATAGRAM: + ((DatagramChannel) channel).write(buf); + break; + } if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; @@ -144,7 +173,7 @@ * Send messages from this socket. This does not enqueue anything onto the * internal pendingWrites buffer if the socket's buffer is full, but instead * triggers an assertion failure. - * + * * @param writeBuf * The packet to be sent. * @param dst @@ -153,13 +182,26 @@ */ public void send(ByteBuffer writeBuf, InetSocketAddress dst) throws Exception { - int bytes = channel.send(writeBuf, dst); + final int bytes; + switch (type) { + case STREAM: + // XXX + bytes = 0; + break; + case DATAGRAM: + bytes = ((DatagramChannel) channel).send(writeBuf, dst); + break; + default: + bytes = 0; + break; + } + // TODO: don't trigger an assertion failure! assert bytes == writeBuf.limit(); } /** * Close the underlying socket. - * + * * @throws Exception */ public void close() throws Exception { Added: java-reactor/trunk/src/reactor/SessionType.java =================================================================== --- java-reactor/trunk/src/reactor/SessionType.java (rev 0) +++ java-reactor/trunk/src/reactor/SessionType.java 2008-10-07 08:37:43 UTC (rev 997) @@ -0,0 +1,7 @@ +package reactor; + +public enum SessionType { + + DATAGRAM, STREAM + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |