[Assorted-commits] SF.net SVN: assorted:[998] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-07 16:44:32
|
Revision: 998 http://assorted.svn.sourceforge.net/assorted/?rev=998&view=rev Author: yangzhang Date: 2008-10-07 16:44:24 +0000 (Tue, 07 Oct 2008) Log Message: ----------- Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Added Paths: ----------- java-reactor/trunk/src/reactor/AbstractHandler.java Added: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 16:44:24 UTC (rev 998) @@ -0,0 +1,25 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * Provides default (no-op) handler methods for all events. + * + * @author yang + * + */ +public class AbstractHandler implements ReactorHandler { + + @Override + public void handleConnect(Session listenerSession, SocketChannel src, + Session clientSession) { + } + + @Override + public void handleRead(Session session, InetSocketAddress src, + ByteBuffer buf) { + } + +} Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 16:44:24 UTC (rev 998) @@ -45,7 +45,6 @@ InetSocketAddress localSa, ReactorHandler handler) { Session session = new Session(type, remoteSa, localSa, handler, selector); - sessions.add(session); return session; } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 16:44:24 UTC (rev 998) @@ -2,19 +2,19 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; /** - * Handler for events pertaining to a socket (session). Currently there is only - * a single event, which is the reception of a packet. - * + * Handler for events pertaining to a socket (session). + * * @author yang - * + * */ public interface ReactorHandler { /** * Handle a received packet. - * + * * @param session * The session (socket) at which the packet was received. * @param src @@ -22,6 +22,20 @@ * @param buf * The received packet. */ - public void handle(Session session, InetSocketAddress src, ByteBuffer buf); + public void handleRead(Session session, InetSocketAddress src, + ByteBuffer buf); + /** + * Handle a new TCP connection. + * + * @param listenerSession + * The session (socket) of the write. + * @param channel + * The client channel. + * @param clientSession + * The client session. + */ + public void handleConnect(Session listenerSession, SocketChannel channel, + Session clientSession); + } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 16:44:24 UTC (rev 998) @@ -49,8 +49,8 @@ serverSa = new InetSocketAddress(localhost, serverPort); clientSa = new InetSocketAddress(localhost, clientPort); - final ReactorHandler handler = new ReactorHandler() { - public void handle(Session service, InetSocketAddress src, + final ReactorHandler handler = new AbstractHandler() { + public void handleRead(Session service, InetSocketAddress src, ByteBuffer buf) { System.out.println("received: " + buf); } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 16:44:24 UTC (rev 998) @@ -3,11 +3,15 @@ import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -48,23 +52,55 @@ this.type = type; try { - DatagramChannel channel = DatagramChannel.open(); - channel.configureBlocking(false); - DatagramSocket socket = channel.socket(); - socket.setReuseAddress(true); - if (localSa != null) - socket.bind(localSa); - if (remoteSa != null) - channel.connect(remoteSa); + switch (type) { + case STREAM: + if (localSa != null && remoteSa == null) { + ServerSocketChannel ch = ServerSocketChannel.open(); + ch.configureBlocking(false); + ServerSocket socket = ch.socket(); + socket.setReuseAddress(true); + socket.bind(localSa); + ch.register(selector, SelectionKey.OP_ACCEPT, this); + channel = ch; + } else { + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + Socket socket = ch.socket(); + socket.connect(remoteSa); + ch.register(selector, SelectionKey.OP_CONNECT, this); + channel = ch; + } + break; + case DATAGRAM: { + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(false); + DatagramSocket socket = ch.socket(); + socket.setReuseAddress(true); + if (localSa != null) + socket.bind(localSa); + if (remoteSa != null) + ch.connect(remoteSa); - channel.register(selector, SelectionKey.OP_READ, this); + ch.register(selector, SelectionKey.OP_READ, this); - this.channel = channel; + channel = ch; + break; + } + default: + throw new AssertionError("unhandled session type"); + } } catch (Exception ex) { throw new RuntimeException(ex); } } + Session(ReactorHandler handler, SocketChannel channel) { + this.handler = handler; + this.remoteSa = null; + this.channel = channel; + this.type = SessionType.STREAM; + } + /** * This is called by the reactor when there is a message to be received. * Reading messages is the priority, so this is done before anything else. @@ -80,13 +116,26 @@ final InetSocketAddress srcSa; switch (type) { - default: - srcSa = null; - break; case STREAM: { - // XXX - // ServerSocketChannel ch = (ServerSocketChannel) channel; - srcSa = null; + if (channel instanceof ServerSocketChannel) { + ServerSocketChannel ch = (ServerSocketChannel) channel; + SocketChannel c = ch.accept(); + handler.handleConnect(this, c, new Session(handler, c)); + } else { + assert channel instanceof SocketChannel; + SocketChannel ch = (SocketChannel) channel; + srcSa = (InetSocketAddress) ch.socket() + .getRemoteSocketAddress(); + handler.handleRead(this, srcSa, null); + int numRead = ch.read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + key.channel().close(); + key.cancel(); + } + // TODO also handle numRead == 0 + } break; } case DATAGRAM: { @@ -107,20 +156,22 @@ // TODO also handle numRead == 0 srcSa = remoteSa; } + + if (srcSa == null) { + break; + } + + // after channel wrote to buf, set lim = pos, then pos = 0 + readBuf.flip(); + // callback + handler.handleRead(this, srcSa, readBuf); + // recycle buffer + readBuf.clear(); break; } + default: + throw new AssertionError("unhandled session type"); } - - if (srcSa == null) { - break; - } - - // after channel wrote to buf, set lim = pos, then pos = 0 - readBuf.flip(); - // callback - handler.handle(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. @@ -192,8 +243,7 @@ bytes = ((DatagramChannel) channel).send(writeBuf, dst); break; default: - bytes = 0; - break; + throw new AssertionError("unhandled session type"); } // TODO: don't trigger an assertion failure! assert bytes == writeBuf.limit(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |