[Assorted-commits] SF.net SVN: assorted:[1011] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-10-09 06:23:43
|
Revision: 1011 http://assorted.svn.sourceforge.net/assorted/?rev=1011&view=rev Author: yangzhang Date: 2008-10-09 06:23:31 +0000 (Thu, 09 Oct 2008) Log Message: ----------- got udp tests working again by adding receive-handling; reorganized tests in general Modified Paths: -------------- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java java-reactor/trunk/src/reactor/AbstractStreamHandler.java java-reactor/trunk/src/reactor/DatagramHandler.java java-reactor/trunk/src/reactor/DatagramSession.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/SocketHandler.java java-reactor/trunk/src/reactor/SocketSession.java java-reactor/trunk/src/reactor/StreamHandler.java java-reactor/trunk/src/reactor/StreamSession.java Modified: java-reactor/trunk/src/reactor/AbstractDatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/AbstractDatagramHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -13,15 +13,19 @@ @Override public void handleReceive(DatagramSession session, InetSocketAddress src, - ByteBuffer buf) { + ByteBuffer buf) throws Exception { } @Override - public void handleRead(SocketSession session, ByteBuffer buf) { + public void handleRead(SocketSession session, ByteBuffer buf) throws Exception { } @Override - public void handleWrite(SocketSession session) { + public void handleWrite(SocketSession session) throws Exception { } + @Override + public void handleClose(SocketSession session) throws Exception { + } + } Modified: java-reactor/trunk/src/reactor/AbstractStreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractStreamHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/AbstractStreamHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -11,15 +11,19 @@ public class AbstractStreamHandler implements StreamHandler { @Override - public void handleRead(SocketSession session, ByteBuffer buf) { + public void handleRead(SocketSession session, ByteBuffer buf) throws Exception { } @Override - public void handleWrite(SocketSession session) { + public void handleWrite(SocketSession session) throws Exception { } @Override - public void handleConnect(StreamSession session) { + public void handleConnect(StreamSession session) throws Exception { } + + @Override + public void handleClose(SocketSession session) throws Exception { + } } Modified: java-reactor/trunk/src/reactor/DatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/DatagramHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -24,6 +24,6 @@ * The received packet. */ public void handleReceive(DatagramSession session, InetSocketAddress src, - ByteBuffer buf); + ByteBuffer buf) throws Exception; } Modified: java-reactor/trunk/src/reactor/DatagramSession.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramSession.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/DatagramSession.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -1,5 +1,6 @@ package reactor; +import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -81,9 +82,48 @@ */ public void send(ByteBuffer writeBuf, InetSocketAddress dst) throws Exception { + writeBuf.rewind(); int bytes = ((DatagramChannel) channel).send(writeBuf, dst); // TODO: don't trigger an assertion failure! assert bytes == writeBuf.limit(); } + /** + * Overrides superclass behavior to support promiscuous receives (calls into + * DatagramHandler.handleReceive()). + */ + @Override + protected boolean read(SelectionKey key, ByteBuffer readBuf) + throws IOException { + if (channel.isConnected()) { + return super.read(key, readBuf); + } else { + final InetSocketAddress srcSa; + try { + srcSa = (InetSocketAddress) channel.receive(readBuf); + if (srcSa == null) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + close(); + return false; + } + } catch (IOException ex) { + close(); + return false; + } + + // after channel wrote to buf, set lim = pos, then pos = + // 0 + readBuf.flip(); + // callback + try { + handler.handleReceive(this, srcSa, readBuf); + } catch (Exception ex) { + ex.printStackTrace(); + } + + return true; + } + } + } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -1,7 +1,5 @@ package reactor; -import java.net.DatagramPacket; -import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -24,26 +22,47 @@ e = Executors.newCachedThreadPool(); } - public void spawn(final Runnable r) { - e.submit(new Runnable() { - public void run() { - try { - r.run(); - } catch (Exception e) { - e.printStackTrace(); - } + abstract class MyRunnable implements Runnable { + public MyRunnable() { + e.submit(this); + } + + public void run() { + try { + myrun(); + } catch (Exception ex) { + ex.printStackTrace(); } - }); + } + + abstract public void myrun() throws Exception; } - private Runnable makeRunnable(final int i) { - return new Runnable() { - public void run() { - System.out.println(i); + public void testScheduler() throws Exception { + // Start the server. + new MyRunnable() { + public void myrun() throws Exception { + Reactor r = new Reactor(); + + for (int i = 0; i < 10; i++) { + final int j = i; + r.schedule(new Runnable() { + public void run() { + System.out.println(j); + } + }, 200 * i, TimeUnit.MILLISECONDS); + } + + r.react(); } }; } + /** + * Run a promiscuous echo server/client, in separate reactors. + * + * @throws Exception + */ public void testUDP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); int serverPort = 11111, clientPort = 22222; @@ -51,64 +70,54 @@ serverSa = new InetSocketAddress(localhost, serverPort); clientSa = new InetSocketAddress(localhost, clientPort); - final DatagramHandler handler = new AbstractDatagramHandler() { - @Override - public void handleReceive(DatagramSession service, - InetSocketAddress src, ByteBuffer buf) { - System.out.println("received: " + buf); + // Start the server. + new MyRunnable() { + public void myrun() throws Exception { + final Reactor r = new Reactor(); + r.createDatagramSession(null, serverSa, + new AbstractDatagramHandler() { + @Override + public void handleReceive(DatagramSession session, + InetSocketAddress src, ByteBuffer buf) + throws Exception { + System.out.println("server received '" + + Charset.defaultCharset().decode(buf) + + "' from " + src); + session.send(buf, src); + r.shutdown(); + } + }); + r.react(); } }; - // Start the server. - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - r.createDatagramSession(null, serverSa, handler); - - for (int i = 0; i < 10; i++) - r.schedule(makeRunnable(i), 200 * i, - TimeUnit.MILLISECONDS); - - Thread.sleep(1000); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } + // Start the client. + new MyRunnable() { + public void myrun() throws Exception { + final Reactor r = new Reactor(); + DatagramSession s = r.createDatagramSession(null, clientSa, + new AbstractDatagramHandler() { + @Override + public void handleReceive(DatagramSession session, + InetSocketAddress src, ByteBuffer buf) { + System.out.println("client received '" + + Charset.defaultCharset().decode(buf) + + "' from " + src); + r.shutdown(); + } + }); + Thread.sleep(100); + s.send(ByteBuffer.wrap("hello".getBytes()), serverSa); + r.react(); } - }); - -// // Start the client. -// spawn(new Runnable() { -// public void run() { -// try { -// Reactor r = new Reactor(); -// ByteBuffer writeBuf = ByteBuffer.allocate(5); -// DatagramSession s = r.createDatagramSession(null, clientSa, handler); -// Thread.sleep(2000); -// s.send(writeBuf, clientSa); -// r.react(); -// } catch (Exception ex) { -// ex.printStackTrace(); -// } -// } -// }); - - // Start a second client. - spawn(new Runnable() { - public void run() { - try { - byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; - Thread.sleep(3000); - new DatagramSocket().send(new DatagramPacket(writeBuf, - writeBuf.length, serverSa)); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); + }; } + /** + * Run an echo server/client, in the same reactor. + * + * @throws Exception + */ public void testTCP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); int serverPort = 11111; @@ -129,15 +138,23 @@ // CharBuffer charBuffer = decoder.decode(buf); // System.out.println("read: " + // charBuffer.toString()); - System.out.println("echoing: '" - + Charset.forName("us-ascii").decode(buf) + System.out.println("echoing '" + + Charset.defaultCharset().decode(buf) + "'"); session.write(buf); + session.close(); +// r.shutdown(); } catch (Exception ex) { throw new RuntimeException(ex); } } + @Override + public void handleClose(SocketSession session) + throws Exception { + System.out.println("server closed"); + } + }; } @@ -146,43 +163,40 @@ }); - r.schedule(new Runnable() { - public void run() { - try { - r.connect(serverSa, new AbstractStreamHandler() { + r.schedule(new MyRunnable() { + public void myrun() throws Exception { + r.connect(serverSa, new AbstractStreamHandler() { - @Override - public void handleConnect(StreamSession session) { - System.out.println("connected"); - try { - session.write(ByteBuffer.wrap("hello" - .getBytes())); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } + @Override + public void handleConnect(StreamSession session) throws Exception { + System.out.println("connected"); + session.write(ByteBuffer.wrap("hello".getBytes())); + } - @Override - public void handleRead(SocketSession session, - ByteBuffer buf) { - System.out.println("got back: '" - + Charset.forName("us-ascii").decode(buf) - + "'"); - r.shutdown(); - } + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + System.out + .println("got back: '" + + Charset.defaultCharset().decode( + buf) + "'"); + r.shutdown(); + } + + @Override + public void handleClose(SocketSession session) { + System.out.println("client closed"); + } - }); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + }); } }, 1, TimeUnit.SECONDS); r.react(); } public static void main(String args[]) throws Exception { - new ReactorTest().testUDP(); - // new ReactorTest().testTCP(); + // new ReactorTest().testScheduler(); + // new ReactorTest().testUDP(); + new ReactorTest().testTCP(); } } Modified: java-reactor/trunk/src/reactor/SocketHandler.java =================================================================== --- java-reactor/trunk/src/reactor/SocketHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/SocketHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -12,7 +12,8 @@ * @param buf * The read data. */ - public void handleRead(SocketSession session, ByteBuffer buf); + public void handleRead(SocketSession session, ByteBuffer buf) + throws Exception; /** * Handle the event where the socket is writable. @@ -20,6 +21,17 @@ * @param session * The session that is ready for writing. */ - public void handleWrite(SocketSession session); + public void handleWrite(SocketSession session) throws Exception; + /** + * Handle a socket close. For TCP sockets, either the remote or local socket + * was closed. For UDP sockets, only the local socket could have been closed + * (as there is no "connection"). + * + * @param session + * The session that was closed. + * @throws Exception + */ + public void handleClose(SocketSession session) throws Exception; + } Modified: java-reactor/trunk/src/reactor/SocketSession.java =================================================================== --- java-reactor/trunk/src/reactor/SocketSession.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/SocketSession.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -8,15 +8,58 @@ import java.util.Deque; public abstract class SocketSession { - + abstract protected SocketHandler getHandler(); + abstract protected ByteChannel getChannel(); + abstract protected SelectionKey getKey(); - + private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); private final Deque<ByteBuffer> pendingWrites = new ArrayDeque<ByteBuffer>(); /** + * Meant to be overriden in subclasses to support alternative read + * procedures. Performs the actual reading and callback. If we get an + * exception from the handler, then just print it and keep on trucking. + * + * @param key + * The selection key for this channel. + * @param readBuf + * The buffer to read into and pass to the handler. + * @return Whether the read loop should continue to iterate. + * @throws IOException + */ + protected boolean read(SelectionKey key, ByteBuffer readBuf) + throws IOException { + try { + int numRead = getChannel().read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + close(); + return false; + } + if (numRead == 0) + return false; + } catch (IOException ex) { + close(); + return false; + } + + // after channel wrote to buf, set lim = pos, then pos = + // 0 + readBuf.flip(); + // callback + try { + getHandler().handleRead(this, readBuf); + } catch (Exception ex) { + ex.printStackTrace(); + } + return true; + } + + /** * This is called by the reactor when there is a message to be received. * Reading messages is the priority, so this is done before anything else. * @@ -26,33 +69,9 @@ * @throws Exception */ void handleRead(SelectionKey key) throws Exception { - while (true) { - try { - int numRead = getChannel().read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down cleanly. Do - // the same from our end and cancel the channel. - getChannel().close(); - key.cancel(); - break; - } - if (numRead == 0) - break; - - // after channel wrote to buf, set lim = pos, then pos = - // 0 - readBuf.flip(); - // callback - getHandler().handleRead(this, readBuf); - // recycle buffer - readBuf.clear(); - } catch (IOException e) { - // The remote forcibly closed the connection, cancel - // the selection key and close the channel. - key.cancel(); - getChannel().close(); - } - } + while (read(key, readBuf)) + // recycle buffer + readBuf.clear(); } /** @@ -71,7 +90,8 @@ ByteBuffer buf = (ByteBuffer) pendingWrites.getFirst(); int initPos = buf.position(); int bytes = getChannel().write(buf); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + assert buf.remaining() == buf.limit() - bytes + && buf.position() == initPos + bytes; if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; @@ -103,8 +123,8 @@ buf.rewind(); int initPos = buf.position(); int bytes = getChannel().write(buf); - System.out.println("writing " + bytes); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + assert buf.remaining() == buf.limit() - bytes + && buf.position() == initPos + bytes; if (buf.remaining() > 0) { pendingWrites.add(buf); // We're now interested in when the socket is ready for writing. @@ -113,12 +133,21 @@ } /** - * Close the underlying socket. + * Close the underlying socket and call the handleClose() method. If the + * handler throws an Exception, just print it. * * @throws Exception */ - public void close() throws Exception { - getChannel().close(); + public void close() throws IOException { + if (getChannel().isOpen()) { + getKey().cancel(); + getChannel().close(); + try { + getHandler().handleClose(this); + } catch (Exception ex) { + ex.printStackTrace(); + } + } } } Modified: java-reactor/trunk/src/reactor/StreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/StreamHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/StreamHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -10,6 +10,6 @@ * @param channel * The channel. */ - public void handleConnect(StreamSession session); + public void handleConnect(StreamSession session) throws Exception; } Modified: java-reactor/trunk/src/reactor/StreamSession.java =================================================================== --- java-reactor/trunk/src/reactor/StreamSession.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/StreamSession.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -53,7 +53,11 @@ assert this.key == key; boolean result = channel.finishConnect(); assert result; - handler.handleConnect(this); + try { + handler.handleConnect(this); + } catch (Exception ex) { + ex.printStackTrace(); + } key.interestOps(SelectionKey.OP_READ); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |