[Assorted-commits] SF.net SVN: assorted:[1001] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-07 22:17:10
|
Revision: 1001 http://assorted.svn.sourceforge.net/assorted/?rev=1001&view=rev Author: yangzhang Date: 2008-10-07 22:17:07 +0000 (Tue, 07 Oct 2008) Log Message: ----------- added (limited) writing to tcp; replaced blocking writer with non-blocking writer in test case Modified Paths: -------------- java-reactor/trunk/src/reactor/AbstractHandler.java java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -23,7 +23,7 @@ } @Override - public void handleConnect(Session session, SocketChannel channel) { + public void handleConnect(Session session) { } } Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -66,16 +66,15 @@ if (updated > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { - System.out.println("here"); if (key.isValid()) { if (key.isReadable()) { - ((Session) key.attachment()).read(key); + ((Session) key.attachment()).handleRead(key); } else if (key.isWritable()) { - ((Session) key.attachment()).write(key); + ((Session) key.attachment()).handleWrite(key); } else if (key.isConnectable()) { - ((Session) key.attachment()).read(key); + ((Session) key.attachment()).handleConnect(key); } else if (key.isAcceptable()) { - ((Session) key.attachment()).read(key); + ((Session) key.attachment()).handleRead(key); } } } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -46,6 +46,6 @@ * @param channel * The channel. */ - public void handleConnect(Session session, SocketChannel channel); + public void handleConnect(Session session); } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -4,7 +4,6 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; @@ -139,31 +138,23 @@ }; r.register(SessionType.STREAM, null, serverSa, h); - spawn(new Runnable() { + r.schedule(new Runnable() { public void run() { - Socket s = new Socket(); - try { - Thread.sleep(100); - s.connect(serverSa); - s.getOutputStream().write("asdf".getBytes()); - s.close(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + r.register(SessionType.STREAM, serverSa, null, + new AbstractHandler() { + @Override + public void handleConnect(Session session) { + System.out.println("connected"); + try { + session.write(ByteBuffer.wrap("asdf" + .getBytes())); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }); } - }); - // r.schedule(new Runnable() { - // public void run() { - // r.register(SessionType.STREAM, serverSa, null, - // new AbstractHandler() { - // @Override - // public void handleAccept(Session listenerSession, - // SocketChannel src, Session clientSession) { - // System.out.println("connected"); - // } - // }); - // } - // }, 1, TimeUnit.SECONDS); + }, 1, TimeUnit.SECONDS); r.react(); } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -16,7 +16,8 @@ /** * Represents a "session," which is probably a bad name for what is basically a - * wrapper around a datagram socket. + * wrapper around a TCP or UDP socket. Encapsulates some extra state information + * associated with the socket. * * @author yang * @@ -66,6 +67,7 @@ } else if (localSa == null && remoteSa != null) { SocketChannel ch = SocketChannel.open(); ch.configureBlocking(false); + ch.connect(remoteSa); ch.register(selector, SelectionKey.OP_CONNECT, this); channel = ch; } else { @@ -113,7 +115,7 @@ * socket. * @throws Exception */ - void read(SelectionKey key) throws Exception { + void handleRead(SelectionKey key) throws Exception { loop: while (true) { try { final InetSocketAddress srcSa; @@ -145,7 +147,8 @@ if (numRead == 0) break loop; - // after channel wrote to buf, set lim = pos, then pos = 0 + // after channel wrote to buf, set lim = pos, then pos = + // 0 readBuf.flip(); // callback handler.handleRead(this, srcSa, readBuf); @@ -197,6 +200,16 @@ } } } + + /** + * Handle the event where the socket has just established an outgoing connection. + */ + void handleConnect(SelectionKey key) throws IOException { + boolean result = ((SocketChannel) channel).finishConnect(); + assert result; + handler.handleConnect(this); + key.interestOps(SelectionKey.OP_READ); + } /** * Handle the event where the socket is ready to be written. This is @@ -208,7 +221,7 @@ * socket. * @throws Exception */ - void write(SelectionKey key) throws Exception { + void handleWrite(SelectionKey key) throws Exception { // Write until there's not more data ... while (!pendingWrites.isEmpty()) { ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); @@ -238,6 +251,28 @@ } /** + * Write data to this stream socket. This does not enqueue anything onto the + * internal pendingWrites buffer if the socket's buffer is full, but instead + * triggers an assertion failure. + * + * @param writeBuf + * The data to be written. + */ + public void write(ByteBuffer writeBuf) throws Exception { + switch (type) { + case STREAM: + int bytes = ((SocketChannel) channel).write(writeBuf); + // TODO: don't trigger an assertion failure! + assert bytes == writeBuf.limit(); + break; + case DATAGRAM: + throw new Exception("cannot write() on a UDP session!"); + default: + throw new AssertionError("unhandled session type"); + } + } + + /** * Send messages from this socket. This does not enqueue anything onto the * internal pendingWrites buffer if the socket's buffer is full, but instead * triggers an assertion failure. @@ -250,20 +285,17 @@ */ public void send(ByteBuffer writeBuf, InetSocketAddress dst) throws Exception { - final int bytes; switch (type) { case STREAM: - // XXX - bytes = 0; - break; + throw new Exception("cannot send() on a TCP session!"); case DATAGRAM: - bytes = ((DatagramChannel) channel).send(writeBuf, dst); + int bytes = ((DatagramChannel) channel).send(writeBuf, dst); + // TODO: don't trigger an assertion failure! + assert bytes == writeBuf.limit(); break; default: throw new AssertionError("unhandled session type"); } - // TODO: don't trigger an assertion failure! - assert bytes == writeBuf.limit(); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |