[Assorted-commits] SF.net SVN: assorted:[1004] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-08 11:22:22
|
Revision: 1004 http://assorted.svn.sourceforge.net/assorted/?rev=1004&view=rev Author: yangzhang Date: 2008-10-08 08:41:08 +0000 (Wed, 08 Oct 2008) Log Message: ----------- completely refactored reactor to distinguish among the various session and handler types for the various types of channels/sockets (tcp listener, tcp connection, udp); got the tcp test working again, but the udp tests are still borked. Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorTest.java Added Paths: ----------- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java java-reactor/trunk/src/reactor/AbstractStreamHandler.java java-reactor/trunk/src/reactor/DatagramHandler.java java-reactor/trunk/src/reactor/DatagramSession.java java-reactor/trunk/src/reactor/ListenerHandler.java java-reactor/trunk/src/reactor/ListenerSession.java java-reactor/trunk/src/reactor/SocketHandler.java java-reactor/trunk/src/reactor/SocketSession.java java-reactor/trunk/src/reactor/StreamHandler.java java-reactor/trunk/src/reactor/StreamSession.java Removed Paths: ------------- java-reactor/trunk/src/reactor/AbstractHandler.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/Session.java java-reactor/trunk/src/reactor/SessionType.java Added: java-reactor/trunk/src/reactor/AbstractDatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/AbstractDatagramHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,27 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +/** + * Provides default (no-op) handler methods for all events. + * + * @author yang + * + */ +public class AbstractDatagramHandler implements DatagramHandler { + + @Override + public void handleReceive(DatagramSession session, InetSocketAddress src, + ByteBuffer buf) { + } + + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + } + + @Override + public void handleWrite(SocketSession session) { + } + +} Deleted: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,29 +0,0 @@ -package reactor; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -/** - * Provides default (no-op) handler methods for all events. - * - * @author yang - * - */ -public class AbstractHandler implements ReactorHandler { - - @Override - public void handleAccept(Session listenerSession, SocketChannel src, - Session clientSession) { - } - - @Override - public void handleRead(Session session, InetSocketAddress src, - ByteBuffer buf) { - } - - @Override - public void handleConnect(Session session) { - } - -} Added: java-reactor/trunk/src/reactor/AbstractStreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractStreamHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/AbstractStreamHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,25 @@ +package reactor; + +import java.nio.ByteBuffer; + +/** + * Provides default (no-op) handler methods for all events. + * + * @author yang + * + */ +public class AbstractStreamHandler implements StreamHandler { + + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + } + + @Override + public void handleWrite(SocketSession session) { + } + + @Override + public void handleConnect(StreamSession session) { + } + +} Added: java-reactor/trunk/src/reactor/DatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/DatagramHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,29 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +/** + * Handler for events pertaining to a socket (session). We separate sessions + * from handlers because we may choose to have multiple sessions all pass their + * events to the same handler. + * + * @author yang + * + */ +public interface DatagramHandler extends SocketHandler { + + /** + * Handle a promiscuously received packet. + * + * @param session + * The session (socket) at which the packet was received. + * @param src + * The sender's socket address. + * @param buf + * The received packet. + */ + public void handleReceive(DatagramSession session, InetSocketAddress src, + ByteBuffer buf); + +} Added: java-reactor/trunk/src/reactor/DatagramSession.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramSession.java (rev 0) +++ java-reactor/trunk/src/reactor/DatagramSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,89 @@ +package reactor; + +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; + +/** + * Represents a "session," which is probably a bad name for what is basically a + * wrapper around a TCP or UDP socket. Encapsulates some extra state information + * associated with the socket. + * + * @author yang + * + */ +public class DatagramSession extends SocketSession { + + private final DatagramChannel channel; + private final DatagramHandler handler; + private final SelectionKey key; + + @Override + protected ByteChannel getChannel() { + return channel; + } + + @Override + protected SocketHandler getHandler() { + return handler; + } + + @Override + protected SelectionKey getKey() { + return key; + } + + /** + * Construct a new Session object. + * + * @param selector + * The selector that is used in the current reactor. + * @param remoteSa + * If non-null, create a restricted datagram socket, which has + * better performance but occupies a file descriptor. + * @param localSa + * The local address and port to send/receive messages on, or + * null for auto port selection. + * @param handler + * The handler for events on this socket. + */ + DatagramSession(Selector selector, InetSocketAddress remoteSa, + InetSocketAddress localSa, DatagramHandler handler) + throws Exception { + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(false); + DatagramSocket socket = ch.socket(); + socket.setReuseAddress(true); + if (localSa != null) + socket.bind(localSa); + if (remoteSa != null) + ch.connect(remoteSa); + + key = ch.register(selector, SelectionKey.OP_READ, this); + channel = ch; + this.handler = handler; + } + + /** + * Send messages from this socket, in promiscuous mode. This does not + * enqueue anything onto the internal pendingWrites buffer if the socket's + * buffer is full, but instead triggers an assertion failure. + * + * @param writeBuf + * The packet to be sent. + * @param dst + * The destination socket address. + * @throws Exception + */ + public void send(ByteBuffer writeBuf, InetSocketAddress dst) + throws Exception { + int bytes = ((DatagramChannel) channel).send(writeBuf, dst); + // TODO: don't trigger an assertion failure! + assert bytes == writeBuf.limit(); + } + +} Added: java-reactor/trunk/src/reactor/ListenerHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ListenerHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/ListenerHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,24 @@ +package reactor; + +import java.nio.channels.SocketChannel; + +public interface ListenerHandler { + + /** + * Handle a newly accepted TCP connection. + * + * @param clientChannel + * The client channel. + * + * @return The StreamHandler to handle events of the new connection. + */ + public StreamHandler handleAccept(SocketChannel clientChannel); + + /** + * Handle exceptions that occurred while accepting. + * + * @param ex The exception. + */ + public void handleException(Exception ex); + +} Added: java-reactor/trunk/src/reactor/ListenerSession.java =================================================================== --- java-reactor/trunk/src/reactor/ListenerSession.java (rev 0) +++ java-reactor/trunk/src/reactor/ListenerSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,58 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class ListenerSession { + + private final ServerSocketChannel channel; + private final ListenerHandler handler; + private final Selector selector; + + ListenerSession(Selector selector, InetSocketAddress localSa, + ListenerHandler handler) throws Exception { + ServerSocketChannel ch = ServerSocketChannel.open(); + ch.configureBlocking(false); + ServerSocket socket = ch.socket(); + socket.setReuseAddress(true); + socket.bind(localSa); + ch.register(selector, SelectionKey.OP_ACCEPT, this); + channel = ch; + this.handler = handler; + this.selector = selector; + } + + /** + * Handle a newly accepted TCP connection. + * + * @param key + * The selection key. + */ + public void handleAccept(SelectionKey key) { + while (true) { + try { + SocketChannel c = channel.accept(); + if (c == null) + break; + c.configureBlocking(false); + new StreamSession(selector, c, handler.handleAccept(c)); + } catch (Exception ex) { + handler.handleException(ex); + } + } + } + + /** + * Close the underlying socket. + * + * @throws Exception + */ + public void close() throws Exception { + channel.close(); + } + +} Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -26,7 +26,15 @@ } /** - * Register a new session (i.e. socket). + * Register a new TCP server session. + */ + public ListenerSession registerStreamServer(InetSocketAddress localSa, + ListenerHandler handler) { + return null; + } + + /** + * Register a new datagram session (i.e. socket). * * @param remoteSa * If non-null, create a restricted datagram socket, which has @@ -38,14 +46,42 @@ * The handler for events on this socket. * @return */ - public Session register(SessionType type, InetSocketAddress remoteSa, - InetSocketAddress localSa, ReactorHandler handler) { - Session session = new Session(type, remoteSa, localSa, handler, - selector); - return session; + public DatagramSession createDatagramSession(InetSocketAddress remoteSa, + InetSocketAddress localSa, DatagramHandler handler) + throws Exception { + return new DatagramSession(selector, remoteSa, localSa, handler); } /** + * Create a stream session that is connecting to the given address. + * + * @param remoteSa + * The remote socket address to connect to. + * @param handler + * The event handler for this connection. + * @throws Exception + */ + public void connect(InetSocketAddress remoteSa, StreamHandler handler) + throws Exception { + new StreamSession(selector, remoteSa, handler); + } + + /** + * Create a stream session that listens on the specified local address for + * connections to accept. + * + * @param localSa + * The local socket address to bind to. + * @param handler + * The event handler for this listener. + * @throws Exception + */ + public void listen(InetSocketAddress localSa, ListenerHandler handler) + throws Exception { + new ListenerSession(selector, localSa, handler); + } + + /** * The main reactor loop. This runs until the shutdown method is called. * * @throws Exception @@ -68,13 +104,15 @@ for (SelectionKey key : keys) { if (key.isValid()) { if (key.isReadable()) { - ((Session) key.attachment()).handleRead(key); + ((SocketSession) key.attachment()).handleRead(key); } else if (key.isWritable()) { - ((Session) key.attachment()).handleWrite(key); + ((SocketSession) key.attachment()).handleWrite(key); } else if (key.isConnectable()) { - ((Session) key.attachment()).handleConnect(key); + ((StreamSession) key.attachment()) + .handleConnect(key); } else if (key.isAcceptable()) { - ((Session) key.attachment()).handleRead(key); + ((ListenerSession) key.attachment()) + .handleAccept(key); } } } Deleted: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,51 +0,0 @@ -package reactor; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -/** - * Handler for events pertaining to a socket (session). - * - * @author yang - * - */ -public interface ReactorHandler { - - /** - * Handle a received packet. - * - * @param session - * The session (socket) at which the packet was received. - * @param src - * The sender's socket address. - * @param buf - * The received packet. - */ - public void handleRead(Session session, InetSocketAddress src, - ByteBuffer buf); - - /** - * Handle a newly accepted TCP connection. - * - * @param listenerSession - * The server session. - * @param channel - * The client channel. - * @param clientSession - * The client session. - */ - public void handleAccept(Session listenerSession, SocketChannel channel, - Session clientSession); - - /** - * Handle a successful outgoing TCP connection (we're the client). - * - * @param session - * The session for this socket. - * @param channel - * The channel. - */ - public void handleConnect(Session session); - -} Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -51,18 +51,20 @@ serverSa = new InetSocketAddress(localhost, serverPort); clientSa = new InetSocketAddress(localhost, clientPort); - final ReactorHandler handler = new AbstractHandler() { - public void handleRead(Session service, InetSocketAddress src, - ByteBuffer buf) { + final DatagramHandler handler = new AbstractDatagramHandler() { + @Override + public void handleReceive(DatagramSession service, + InetSocketAddress src, ByteBuffer buf) { System.out.println("received: " + buf); } }; + // Start the server. spawn(new Runnable() { public void run() { try { Reactor r = new Reactor(); - r.register(SessionType.DATAGRAM, null, serverSa, handler); + r.createDatagramSession(null, serverSa, handler); for (int i = 0; i < 10; i++) r.schedule(makeRunnable(i), 200 * i, @@ -76,22 +78,23 @@ } }); - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - ByteBuffer writeBuf = ByteBuffer.allocate(5); - Session s = r.register(SessionType.DATAGRAM, null, - clientSa, handler); - Thread.sleep(2000); - s.send(writeBuf, clientSa); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); +// // Start the client. +// spawn(new Runnable() { +// public void run() { +// try { +// Reactor r = new Reactor(); +// ByteBuffer writeBuf = ByteBuffer.allocate(5); +// DatagramSession s = r.createDatagramSession(null, clientSa, handler); +// Thread.sleep(2000); +// s.send(writeBuf, clientSa); +// r.react(); +// } catch (Exception ex) { +// ex.printStackTrace(); +// } +// } +// }); + // Start a second client. spawn(new Runnable() { public void run() { try { @@ -112,64 +115,74 @@ final InetSocketAddress serverSa; serverSa = new InetSocketAddress(localhost, serverPort); final Reactor r = new Reactor(); - r.register(SessionType.STREAM, null, serverSa, new AbstractHandler() { + r.listen(serverSa, new ListenerHandler() { - @Override - public void handleAccept(Session listenerSession, - SocketChannel src, Session clientSession) { - System.out.println("accepted"); + public StreamHandler handleAccept(SocketChannel clientChannel) { + return new AbstractStreamHandler() { + + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + try { + // Short-hand for: + // Charset charset = Charset.forName("us-ascii"); + // CharsetDecoder decoder = charset.newDecoder(); + // CharBuffer charBuffer = decoder.decode(buf); + // System.out.println("read: " + + // charBuffer.toString()); + System.out.println("echoing: '" + + Charset.forName("us-ascii").decode(buf) + + "'"); + session.write(buf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + }; } - @Override - public void handleRead(Session session, InetSocketAddress src, - ByteBuffer buf) { - try { - // Short-hand for: - // Charset charset = Charset.forName("us-ascii"); - // CharsetDecoder decoder = charset.newDecoder(); - // CharBuffer charBuffer = decoder.decode(buf); - // System.out.println("read: " + charBuffer.toString()); - System.out.println("echoing: '" - + Charset.forName("us-ascii").decode(buf) + "'"); - session.write(buf); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + public void handleException(Exception ex) { } }); + r.schedule(new Runnable() { public void run() { - r.register(SessionType.STREAM, serverSa, null, - new AbstractHandler() { - @Override - public void handleConnect(Session session) { - System.out.println("connected"); - try { - session.write(ByteBuffer.wrap("hello" - .getBytes())); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + try { + r.connect(serverSa, new AbstractStreamHandler() { + + @Override + public void handleConnect(StreamSession session) { + System.out.println("connected"); + try { + session.write(ByteBuffer.wrap("hello" + .getBytes())); + } catch (Exception ex) { + throw new RuntimeException(ex); } + } - @Override - public void handleRead(Session session, - InetSocketAddress src, ByteBuffer buf) { - System.out.println("got back: '" - + Charset.forName("us-ascii").decode( - buf) + "'"); - r.shutdown(); - } - }); + @Override + public void handleRead(SocketSession session, + ByteBuffer buf) { + System.out.println("got back: '" + + Charset.forName("us-ascii").decode(buf) + + "'"); + r.shutdown(); + } + + }); + } catch (Exception ex) { + throw new RuntimeException(ex); + } } }, 1, TimeUnit.SECONDS); r.react(); } public static void main(String args[]) throws Exception { - // new ReactorTest().testUDP(); - new ReactorTest().testTCP(); + new ReactorTest().testUDP(); + // new ReactorTest().testTCP(); } } Deleted: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,309 +0,0 @@ -package reactor; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.List; - -/** - * Represents a "session," which is probably a bad name for what is basically a - * wrapper around a TCP or UDP socket. Encapsulates some extra state information - * associated with the socket. - * - * @author yang - * - */ -public class Session { - - private final SelectableChannel channel; - private final ReactorHandler handler; - private final InetSocketAddress remoteSa; - private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); - private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); - private final SessionType type; - private final Selector selector; - - /** - * Construct a new Session object. - * - * @param remoteSa - * If non-null, create a restricted datagram socket, which has - * better performance but occupies a file descriptor. - * @param localSa - * The local address and port to send/receive messages on, or - * null for auto port selection. - * @param handler - * The handler for events on this socket. - * @param selector - * The selector that is used in the current reactor. - */ - Session(SessionType type, InetSocketAddress remoteSa, - InetSocketAddress localSa, ReactorHandler handler, Selector selector) { - this.handler = handler; - this.remoteSa = remoteSa; - this.type = type; - this.selector = selector; - - try { - switch (type) { - case STREAM: - if (localSa != null && remoteSa == null) { - ServerSocketChannel ch = ServerSocketChannel.open(); - ch.configureBlocking(false); - ServerSocket socket = ch.socket(); - socket.setReuseAddress(true); - socket.bind(localSa); - ch.register(selector, SelectionKey.OP_ACCEPT, this); - channel = ch; - } else if (localSa == null && remoteSa != null) { - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - ch.connect(remoteSa); - ch.register(selector, SelectionKey.OP_CONNECT, this); - channel = ch; - } else { - throw new IllegalArgumentException( - "At least one of remoteSa or localSa must be non-null."); - } - break; - case DATAGRAM: { - DatagramChannel ch = DatagramChannel.open(); - ch.configureBlocking(false); - DatagramSocket socket = ch.socket(); - socket.setReuseAddress(true); - if (localSa != null) - socket.bind(localSa); - if (remoteSa != null) - ch.connect(remoteSa); - - ch.register(selector, SelectionKey.OP_READ, this); - - channel = ch; - break; - } - default: - throw new AssertionError("unhandled session type"); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - Session(ReactorHandler handler, SocketChannel channel, Selector selector) { - this.handler = handler; - this.remoteSa = null; - this.channel = channel; - this.type = SessionType.STREAM; - this.selector = selector; - } - - /** - * This is called by the reactor when there is a message to be received. - * Reading messages is the priority, so this is done before anything else. - * - * @param key - * The key into the selector's socket set representing our - * socket. - * @throws Exception - */ - void handleRead(SelectionKey key) throws Exception { - loop: while (true) { - try { - final InetSocketAddress srcSa; - - switch (type) { - case STREAM: { - if (channel instanceof ServerSocketChannel) { - ServerSocketChannel ch = (ServerSocketChannel) channel; - SocketChannel c = ch.accept(); - if (c == null) - break loop; - c.configureBlocking(false); - Session session = new Session(handler, c, selector); - c.register(selector, SelectionKey.OP_READ, session); - handler.handleAccept(this, c, session); - } else { - assert channel instanceof SocketChannel; - SocketChannel ch = (SocketChannel) channel; - srcSa = (InetSocketAddress) ch.socket() - .getRemoteSocketAddress(); - int numRead = ch.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down cleanly. Do - // the same from our end and cancel the channel. - key.channel().close(); - key.cancel(); - break loop; - } - if (numRead == 0) - break loop; - - // after channel wrote to buf, set lim = pos, then pos = - // 0 - readBuf.flip(); - // callback - handler.handleRead(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); - } - break; - } - case DATAGRAM: { - DatagramChannel ch = (DatagramChannel) channel; - if (remoteSa == null) { - srcSa = (InetSocketAddress) ch.receive(readBuf); - } else { - int numRead = ch.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down - // cleanly. - // Do - // the same from our end and cancel the - // channel. - key.channel().close(); - key.cancel(); - break loop; - } - // TODO also handle numRead == 0 - srcSa = remoteSa; - } - - if (srcSa == null) { - break loop; - } - - // after channel wrote to buf, set lim = pos, then pos = 0 - readBuf.flip(); - // callback - handler.handleRead(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); - break; - } - default: - throw new AssertionError("unhandled session type"); - } - } catch (IOException e) { - // The remote forcibly closed the connection, cancel - // the selection key and close the channel. - key.cancel(); - channel.close(); - } - } - } - - /** - * Handle the event where the socket has just established an outgoing - * connection. - */ - void handleConnect(SelectionKey key) throws IOException { - boolean result = ((SocketChannel) channel).finishConnect(); - assert result; - handler.handleConnect(this); - key.interestOps(SelectionKey.OP_READ); - } - - /** - * Handle the event where the socket is ready to be written. This is - * currently not used since the send() method blindly writes immediately to - * the socket - which is bad. - * - * @param key - * The key into the selector's socket set that represents our - * socket. - * @throws Exception - */ - void handleWrite(SelectionKey key) throws Exception { - // Write until there's not more data ... - while (!pendingWrites.isEmpty()) { - ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - int initPos = buf.position(); - int bytes = ((WritableByteChannel) channel).write(buf); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; - if (buf.remaining() > 0) { - // ... or the socket's buffer fills up - break; - } - pendingWrites.remove(0); - } - - if (pendingWrites.isEmpty()) { - // We wrote away all data, so we're no longer - // interested - // in writing on this socket. Switch back to waiting - // for - // data (remove OP_WRITE). - key.interestOps(SelectionKey.OP_READ); - } - } - - // TODO: use static (fixed) writeBuf (so that it can't change from under our - // feet)? - /** - * Write data to this stream socket. This does not enqueue anything onto the - * internal pendingWrites buffer if the socket's buffer is full, but instead - * triggers an assertion failure. - * - * @param buf - * The data to be written. - */ - public void write(ByteBuffer buf) throws Exception { - buf.rewind(); - int initPos = buf.position(); - int bytes = ((WritableByteChannel) channel).write(buf); - System.out.println("writing " + bytes); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; - if (buf.remaining() > 0) { - pendingWrites.add(buf); - // We're now interested in when the socket is ready for writing. - this.channel.keyFor(selector).interestOps( - SelectionKey.OP_READ | SelectionKey.OP_WRITE); - } - } - - /** - * Send messages from this socket. This does not enqueue anything onto the - * internal pendingWrites buffer if the socket's buffer is full, but instead - * triggers an assertion failure. - * - * @param writeBuf - * The packet to be sent. - * @param dst - * The destination socket address. - * @throws Exception - */ - public void send(ByteBuffer writeBuf, InetSocketAddress dst) - throws Exception { - switch (type) { - case STREAM: - throw new Exception("cannot send() on a TCP session!"); - case DATAGRAM: - int bytes = ((DatagramChannel) channel).send(writeBuf, dst); - // TODO: don't trigger an assertion failure! - assert bytes == writeBuf.limit(); - break; - default: - throw new AssertionError("unhandled session type"); - } - } - - /** - * Close the underlying socket. - * - * @throws Exception - */ - public void close() throws Exception { - channel.close(); - } - -} Deleted: java-reactor/trunk/src/reactor/SessionType.java =================================================================== --- java-reactor/trunk/src/reactor/SessionType.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/SessionType.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,7 +0,0 @@ -package reactor; - -public enum SessionType { - - DATAGRAM, STREAM - -} Added: java-reactor/trunk/src/reactor/SocketHandler.java =================================================================== --- java-reactor/trunk/src/reactor/SocketHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/SocketHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,25 @@ +package reactor; + +import java.nio.ByteBuffer; + +public interface SocketHandler { + + /** + * Handle a read. + * + * @param session + * The session (socket) at which the packet was received. + * @param buf + * The read data. + */ + public void handleRead(SocketSession session, ByteBuffer buf); + + /** + * Handle the event where the socket is writable. + * + * @param session + * The session that is ready for writing. + */ + public void handleWrite(SocketSession session); + +} Added: java-reactor/trunk/src/reactor/SocketSession.java =================================================================== --- java-reactor/trunk/src/reactor/SocketSession.java (rev 0) +++ java-reactor/trunk/src/reactor/SocketSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,124 @@ +package reactor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.util.ArrayDeque; +import java.util.Deque; + +public abstract class SocketSession { + + abstract protected SocketHandler getHandler(); + abstract protected ByteChannel getChannel(); + abstract protected SelectionKey getKey(); + + private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); + private final Deque<ByteBuffer> pendingWrites = new ArrayDeque<ByteBuffer>(); + + /** + * This is called by the reactor when there is a message to be received. + * Reading messages is the priority, so this is done before anything else. + * + * @param key + * The key into the selector's socket set representing our + * socket. + * @throws Exception + */ + void handleRead(SelectionKey key) throws Exception { + while (true) { + try { + int numRead = getChannel().read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + getChannel().close(); + key.cancel(); + break; + } + if (numRead == 0) + break; + + // after channel wrote to buf, set lim = pos, then pos = + // 0 + readBuf.flip(); + // callback + getHandler().handleRead(this, readBuf); + // recycle buffer + readBuf.clear(); + } catch (IOException e) { + // The remote forcibly closed the connection, cancel + // the selection key and close the channel. + key.cancel(); + getChannel().close(); + } + } + } + + /** + * Handle the event where the socket is ready to be written. This is + * currently not used since the send() method blindly writes immediately to + * the socket - which is bad. + * + * @param key + * The key into the selector's socket set that represents our + * socket. + * @throws Exception + */ + void handleWrite(SelectionKey key) throws Exception { + // Write until there's not more data ... + while (!pendingWrites.isEmpty()) { + ByteBuffer buf = (ByteBuffer) pendingWrites.getFirst(); + int initPos = buf.position(); + int bytes = getChannel().write(buf); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + if (buf.remaining() > 0) { + // ... or the socket's buffer fills up + break; + } + pendingWrites.removeFirst(); + } + + if (pendingWrites.isEmpty()) { + // We wrote away all data, so we're no longer + // interested + // in writing on this socket. Switch back to waiting + // for + // data (remove OP_WRITE). + key.interestOps(SelectionKey.OP_READ); + } + } + + // TODO: use static (fixed) writeBuf (so that it can't change from under our + // feet)? + /** + * Write data to this stream socket. This does not enqueue anything onto the + * internal pendingWrites buffer if the socket's buffer is full, but instead + * triggers an assertion failure. + * + * @param buf + * The data to be written. + */ + public void write(ByteBuffer buf) throws Exception { + buf.rewind(); + int initPos = buf.position(); + int bytes = getChannel().write(buf); + System.out.println("writing " + bytes); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + if (buf.remaining() > 0) { + pendingWrites.add(buf); + // We're now interested in when the socket is ready for writing. + getKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + } + } + + /** + * Close the underlying socket. + * + * @throws Exception + */ + public void close() throws Exception { + getChannel().close(); + } + +} Added: java-reactor/trunk/src/reactor/StreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/StreamHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/StreamHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,15 @@ +package reactor; + +public interface StreamHandler extends SocketHandler { + + /** + * Handle a successful outgoing TCP connection (we're the client). + * + * @param session + * The session for this socket. + * @param channel + * The channel. + */ + public void handleConnect(StreamSession session); + +} Added: java-reactor/trunk/src/reactor/StreamSession.java =================================================================== --- java-reactor/trunk/src/reactor/StreamSession.java (rev 0) +++ java-reactor/trunk/src/reactor/StreamSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,60 @@ +package reactor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + +public class StreamSession extends SocketSession { + + private final SocketChannel channel; + private final StreamHandler handler; + private final SelectionKey key; + + @Override + protected ByteChannel getChannel() { + return channel; + } + + @Override + protected SocketHandler getHandler() { + return handler; + } + + @Override + protected SelectionKey getKey() { + return key; + } + + public StreamSession(Selector selector, InetSocketAddress remoteSa, + StreamHandler handler) throws Exception { + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + ch.connect(remoteSa); + key = ch.register(selector, SelectionKey.OP_CONNECT, this); + this.handler = handler; + channel = ch; + } + + StreamSession(Selector selector, SocketChannel ch, StreamHandler handler) + throws Exception { + key = ch.register(selector, SelectionKey.OP_READ, this); + this.handler = handler; + channel = ch; + } + + /** + * Handle the event where the socket has just established an outgoing + * connection. + */ + void handleConnect(SelectionKey key) throws IOException { + assert this.key == key; + boolean result = channel.finishConnect(); + assert result; + handler.handleConnect(this); + key.interestOps(SelectionKey.OP_READ); + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |