assorted-commits Mailing List for Assorted projects (Page 37)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2008-10-07 22:36:08
|
Revision: 1002 http://assorted.svn.sourceforge.net/assorted/?rev=1002&view=rev Author: yangzhang Date: 2008-10-07 22:35:53 +0000 (Tue, 07 Oct 2008) Log Message: ----------- fixed buggy writing; added unbounded writes; made the example into an echo server Modified Paths: -------------- java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:17:07 UTC (rev 1001) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:35:53 UTC (rev 1002) @@ -129,8 +129,9 @@ // CharsetDecoder decoder = charset.newDecoder(); // CharBuffer charBuffer = decoder.decode(buf); // System.out.println("read: " + charBuffer.toString()); - System.out.println("read: '" + System.out.println("echoing: '" + Charset.forName("us-ascii").decode(buf) + "'"); + session.write(buf); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -146,12 +147,20 @@ public void handleConnect(Session session) { System.out.println("connected"); try { - session.write(ByteBuffer.wrap("asdf" + session.write(ByteBuffer.wrap("hello" .getBytes())); } catch (Exception ex) { throw new RuntimeException(ex); } } + + @Override + public void handleRead(Session session, + InetSocketAddress src, ByteBuffer buf) { + System.out.println("got back: '" + + Charset.forName("us-ascii").decode( + buf) + "'"); + } }); } }, 1, TimeUnit.SECONDS); Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 22:17:07 UTC (rev 1001) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 22:35:53 UTC (rev 1002) @@ -11,6 +11,7 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.List; @@ -200,9 +201,10 @@ } } } - + /** - * Handle the event where the socket has just established an outgoing connection. + * Handle the event where the socket has just established an outgoing + * connection. */ void handleConnect(SelectionKey key) throws IOException { boolean result = ((SocketChannel) channel).finishConnect(); @@ -225,14 +227,9 @@ // Write until there's not more data ... while (!pendingWrites.isEmpty()) { ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - switch (type) { - case STREAM: - // XXX - break; - case DATAGRAM: - ((DatagramChannel) channel).write(buf); - break; - } + int initPos = buf.position(); + int bytes = ((WritableByteChannel) channel).write(buf); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; @@ -245,30 +242,32 @@ // interested // in writing on this socket. Switch back to waiting // for - // data. + // data (remove OP_WRITE). key.interestOps(SelectionKey.OP_READ); } } + // TODO: use static (fixed) writeBuf (so that it can't change from under our + // feet)? /** * Write data to this stream socket. This does not enqueue anything onto the * internal pendingWrites buffer if the socket's buffer is full, but instead * triggers an assertion failure. * - * @param writeBuf + * @param buf * The data to be written. */ - public void write(ByteBuffer writeBuf) throws Exception { - switch (type) { - case STREAM: - int bytes = ((SocketChannel) channel).write(writeBuf); - // TODO: don't trigger an assertion failure! - assert bytes == writeBuf.limit(); - break; - case DATAGRAM: - throw new Exception("cannot write() on a UDP session!"); - default: - throw new AssertionError("unhandled session type"); + public void write(ByteBuffer buf) throws Exception { + buf.rewind(); + int initPos = buf.position(); + int bytes = ((WritableByteChannel) channel).write(buf); + System.out.println("writing " + bytes); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + if (buf.remaining() > 0) { + pendingWrites.add(buf); + // We're now interested in when the socket is ready for writing. + this.channel.keyFor(selector).interestOps( + SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 22:17:10
|
Revision: 1001 http://assorted.svn.sourceforge.net/assorted/?rev=1001&view=rev Author: yangzhang Date: 2008-10-07 22:17:07 +0000 (Tue, 07 Oct 2008) Log Message: ----------- added (limited) writing to tcp; replaced blocking writer with non-blocking writer in test case Modified Paths: -------------- java-reactor/trunk/src/reactor/AbstractHandler.java java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -23,7 +23,7 @@ } @Override - public void handleConnect(Session session, SocketChannel channel) { + public void handleConnect(Session session) { } } Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -66,16 +66,15 @@ if (updated > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { - System.out.println("here"); if (key.isValid()) { if (key.isReadable()) { - ((Session) key.attachment()).read(key); + ((Session) key.attachment()).handleRead(key); } else if (key.isWritable()) { - ((Session) key.attachment()).write(key); + ((Session) key.attachment()).handleWrite(key); } else if (key.isConnectable()) { - ((Session) key.attachment()).read(key); + ((Session) key.attachment()).handleConnect(key); } else if (key.isAcceptable()) { - ((Session) key.attachment()).read(key); + ((Session) key.attachment()).handleRead(key); } } } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -46,6 +46,6 @@ * @param channel * The channel. */ - public void handleConnect(Session session, SocketChannel channel); + public void handleConnect(Session session); } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -4,7 +4,6 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; @@ -139,31 +138,23 @@ }; r.register(SessionType.STREAM, null, serverSa, h); - spawn(new Runnable() { + r.schedule(new Runnable() { public void run() { - Socket s = new Socket(); - try { - Thread.sleep(100); - s.connect(serverSa); - s.getOutputStream().write("asdf".getBytes()); - s.close(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + r.register(SessionType.STREAM, serverSa, null, + new AbstractHandler() { + @Override + public void handleConnect(Session session) { + System.out.println("connected"); + try { + session.write(ByteBuffer.wrap("asdf" + .getBytes())); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }); } - }); - // r.schedule(new Runnable() { - // public void run() { - // r.register(SessionType.STREAM, serverSa, null, - // new AbstractHandler() { - // @Override - // public void handleAccept(Session listenerSession, - // SocketChannel src, Session clientSession) { - // System.out.println("connected"); - // } - // }); - // } - // }, 1, TimeUnit.SECONDS); + }, 1, TimeUnit.SECONDS); r.react(); } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:56:16 UTC (rev 1000) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 22:17:07 UTC (rev 1001) @@ -16,7 +16,8 @@ /** * Represents a "session," which is probably a bad name for what is basically a - * wrapper around a datagram socket. + * wrapper around a TCP or UDP socket. Encapsulates some extra state information + * associated with the socket. * * @author yang * @@ -66,6 +67,7 @@ } else if (localSa == null && remoteSa != null) { SocketChannel ch = SocketChannel.open(); ch.configureBlocking(false); + ch.connect(remoteSa); ch.register(selector, SelectionKey.OP_CONNECT, this); channel = ch; } else { @@ -113,7 +115,7 @@ * socket. * @throws Exception */ - void read(SelectionKey key) throws Exception { + void handleRead(SelectionKey key) throws Exception { loop: while (true) { try { final InetSocketAddress srcSa; @@ -145,7 +147,8 @@ if (numRead == 0) break loop; - // after channel wrote to buf, set lim = pos, then pos = 0 + // after channel wrote to buf, set lim = pos, then pos = + // 0 readBuf.flip(); // callback handler.handleRead(this, srcSa, readBuf); @@ -197,6 +200,16 @@ } } } + + /** + * Handle the event where the socket has just established an outgoing connection. + */ + void handleConnect(SelectionKey key) throws IOException { + boolean result = ((SocketChannel) channel).finishConnect(); + assert result; + handler.handleConnect(this); + key.interestOps(SelectionKey.OP_READ); + } /** * Handle the event where the socket is ready to be written. This is @@ -208,7 +221,7 @@ * socket. * @throws Exception */ - void write(SelectionKey key) throws Exception { + void handleWrite(SelectionKey key) throws Exception { // Write until there's not more data ... while (!pendingWrites.isEmpty()) { ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); @@ -238,6 +251,28 @@ } /** + * Write data to this stream socket. This does not enqueue anything onto the + * internal pendingWrites buffer if the socket's buffer is full, but instead + * triggers an assertion failure. + * + * @param writeBuf + * The data to be written. + */ + public void write(ByteBuffer writeBuf) throws Exception { + switch (type) { + case STREAM: + int bytes = ((SocketChannel) channel).write(writeBuf); + // TODO: don't trigger an assertion failure! + assert bytes == writeBuf.limit(); + break; + case DATAGRAM: + throw new Exception("cannot write() on a UDP session!"); + default: + throw new AssertionError("unhandled session type"); + } + } + + /** * Send messages from this socket. This does not enqueue anything onto the * internal pendingWrites buffer if the socket's buffer is full, but instead * triggers an assertion failure. @@ -250,20 +285,17 @@ */ public void send(ByteBuffer writeBuf, InetSocketAddress dst) throws Exception { - final int bytes; switch (type) { case STREAM: - // XXX - bytes = 0; - break; + throw new Exception("cannot send() on a TCP session!"); case DATAGRAM: - bytes = ((DatagramChannel) channel).send(writeBuf, dst); + int bytes = ((DatagramChannel) channel).send(writeBuf, dst); + // TODO: don't trigger an assertion failure! + assert bytes == writeBuf.limit(); break; default: throw new AssertionError("unhandled session type"); } - // TODO: don't trigger an assertion failure! - assert bytes == writeBuf.limit(); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 21:56:31
|
Revision: 1000 http://assorted.svn.sourceforge.net/assorted/?rev=1000&view=rev Author: yangzhang Date: 2008-10-07 21:56:16 +0000 (Tue, 07 Oct 2008) Log Message: ----------- added a (blocking) writer to the test case; removed extraneous empty read (bug existed in both udp and tcp) Modified Paths: -------------- java-reactor/trunk/src/reactor/AbstractHandler.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -13,7 +13,7 @@ public class AbstractHandler implements ReactorHandler { @Override - public void handleConnect(Session listenerSession, SocketChannel src, + public void handleAccept(Session listenerSession, SocketChannel src, Session clientSession) { } @@ -22,4 +22,8 @@ ByteBuffer buf) { } + @Override + public void handleConnect(Session session, SocketChannel channel) { + } + } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -26,16 +26,26 @@ ByteBuffer buf); /** - * Handle a new TCP connection. + * Handle a newly accepted TCP connection. * * @param listenerSession - * The session (socket) of the write. + * The server session. * @param channel * The client channel. * @param clientSession * The client session. */ - public void handleConnect(Session listenerSession, SocketChannel channel, + public void handleAccept(Session listenerSession, SocketChannel channel, Session clientSession); + /** + * Handle a successful outgoing TCP connection (we're the client). + * + * @param session + * The session for this socket. + * @param channel + * The channel. + */ + public void handleConnect(Session session, SocketChannel channel); + } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -4,8 +4,10 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -104,35 +106,69 @@ } }); } - + public void testTCP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); - int serverPort = 11111, clientPort = 22222; - final InetSocketAddress serverSa, clientSa; + int serverPort = 11111; + final InetSocketAddress serverSa; serverSa = new InetSocketAddress(localhost, serverPort); - clientSa = new InetSocketAddress(localhost, clientPort); - Reactor r = new Reactor(); + final Reactor r = new Reactor(); ReactorHandler h = new AbstractHandler() { @Override - public void handleConnect(Session listenerSession, + public void handleAccept(Session listenerSession, SocketChannel src, Session clientSession) { - System.out.println("connection"); + System.out.println("accepted"); } @Override public void handleRead(Session session, InetSocketAddress src, ByteBuffer buf) { - System.out.println("got packet"); + try { + // Short-hand for: + // Charset charset = Charset.forName("us-ascii"); + // CharsetDecoder decoder = charset.newDecoder(); + // CharBuffer charBuffer = decoder.decode(buf); + // System.out.println("read: " + charBuffer.toString()); + System.out.println("read: '" + + Charset.forName("us-ascii").decode(buf) + "'"); + } catch (Exception ex) { + throw new RuntimeException(ex); + } } - + }; r.register(SessionType.STREAM, null, serverSa, h); + spawn(new Runnable() { + public void run() { + Socket s = new Socket(); + try { + Thread.sleep(100); + s.connect(serverSa); + s.getOutputStream().write("asdf".getBytes()); + s.close(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }); + // r.schedule(new Runnable() { + // public void run() { + // r.register(SessionType.STREAM, serverSa, null, + // new AbstractHandler() { + // @Override + // public void handleAccept(Session listenerSession, + // SocketChannel src, Session clientSession) { + // System.out.println("connected"); + // } + // }); + // } + // }, 1, TimeUnit.SECONDS); r.react(); } public static void main(String args[]) throws Exception { - //new ReactorTest().testUDP(); + // new ReactorTest().testUDP(); new ReactorTest().testTCP(); } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:32:33 UTC (rev 999) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:56:16 UTC (rev 1000) @@ -127,8 +127,8 @@ break loop; c.configureBlocking(false); Session session = new Session(handler, c, selector); - SelectionKey k = c.register(selector, SelectionKey.OP_READ, session); - handler.handleConnect(this, c, session); + c.register(selector, SelectionKey.OP_READ, session); + handler.handleAccept(this, c, session); } else { assert channel instanceof SocketChannel; SocketChannel ch = (SocketChannel) channel; @@ -140,10 +140,17 @@ // the same from our end and cancel the channel. key.channel().close(); key.cancel(); + break loop; } if (numRead == 0) break loop; + + // after channel wrote to buf, set lim = pos, then pos = 0 + readBuf.flip(); + // callback handler.handleRead(this, srcSa, readBuf); + // recycle buffer + readBuf.clear(); } break; } @@ -161,6 +168,7 @@ // channel. key.channel().close(); key.cancel(); + break loop; } // TODO also handle numRead == 0 srcSa = remoteSa; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 21:32:36
|
Revision: 999 http://assorted.svn.sourceforge.net/assorted/?rev=999&view=rev Author: yangzhang Date: 2008-10-07 21:32:33 +0000 (Tue, 07 Oct 2008) Log Message: ----------- checkpoint. accepting new tcp connections works, but things are messy. tcp writes are still not supported (only udp sends as before). Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 16:44:24 UTC (rev 998) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 21:32:33 UTC (rev 999) @@ -3,8 +3,6 @@ import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.ArrayList; -import java.util.List; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ScheduledFuture; @@ -20,7 +18,6 @@ public class Reactor { private final Selector selector; - private final List<Session> sessions = new ArrayList<Session>(); private boolean doShutdown = false; private final PriorityQueue<ReactorTask> tasks = new PriorityQueue<ReactorTask>(); @@ -69,11 +66,16 @@ if (updated > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { + System.out.println("here"); if (key.isValid()) { if (key.isReadable()) { ((Session) key.attachment()).read(key); } else if (key.isWritable()) { ((Session) key.attachment()).write(key); + } else if (key.isConnectable()) { + ((Session) key.attachment()).read(key); + } else if (key.isAcceptable()) { + ((Session) key.attachment()).read(key); } } } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 16:44:24 UTC (rev 998) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 21:32:33 UTC (rev 999) @@ -5,6 +5,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -42,7 +43,7 @@ }; } - public void test() throws Exception { + public void testUDP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); int serverPort = 11111, clientPort = 22222; final InetSocketAddress serverSa, clientSa; @@ -103,9 +104,36 @@ } }); } + + public void testTCP() throws Exception { + InetAddress localhost = InetAddress.getLocalHost(); + int serverPort = 11111, clientPort = 22222; + final InetSocketAddress serverSa, clientSa; + serverSa = new InetSocketAddress(localhost, serverPort); + clientSa = new InetSocketAddress(localhost, clientPort); + Reactor r = new Reactor(); + ReactorHandler h = new AbstractHandler() { + @Override + public void handleConnect(Session listenerSession, + SocketChannel src, Session clientSession) { + System.out.println("connection"); + } + + @Override + public void handleRead(Session session, InetSocketAddress src, + ByteBuffer buf) { + System.out.println("got packet"); + } + + }; + r.register(SessionType.STREAM, null, serverSa, h); + r.react(); + } + public static void main(String args[]) throws Exception { - new ReactorTest().test(); + //new ReactorTest().testUDP(); + new ReactorTest().testTCP(); } } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 16:44:24 UTC (rev 998) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 21:32:33 UTC (rev 999) @@ -4,7 +4,6 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; @@ -30,6 +29,7 @@ private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); private final SessionType type; + private final Selector selector; /** * Construct a new Session object. @@ -50,6 +50,7 @@ this.handler = handler; this.remoteSa = remoteSa; this.type = type; + this.selector = selector; try { switch (type) { @@ -62,13 +63,14 @@ socket.bind(localSa); ch.register(selector, SelectionKey.OP_ACCEPT, this); channel = ch; - } else { + } else if (localSa == null && remoteSa != null) { SocketChannel ch = SocketChannel.open(); ch.configureBlocking(false); - Socket socket = ch.socket(); - socket.connect(remoteSa); ch.register(selector, SelectionKey.OP_CONNECT, this); channel = ch; + } else { + throw new IllegalArgumentException( + "At least one of remoteSa or localSa must be non-null."); } break; case DATAGRAM: { @@ -94,11 +96,12 @@ } } - Session(ReactorHandler handler, SocketChannel channel) { + Session(ReactorHandler handler, SocketChannel channel, Selector selector) { this.handler = handler; this.remoteSa = null; this.channel = channel; this.type = SessionType.STREAM; + this.selector = selector; } /** @@ -111,7 +114,7 @@ * @throws Exception */ void read(SelectionKey key) throws Exception { - while (true) { + loop: while (true) { try { final InetSocketAddress srcSa; @@ -120,13 +123,17 @@ if (channel instanceof ServerSocketChannel) { ServerSocketChannel ch = (ServerSocketChannel) channel; SocketChannel c = ch.accept(); - handler.handleConnect(this, c, new Session(handler, c)); + if (c == null) + break loop; + c.configureBlocking(false); + Session session = new Session(handler, c, selector); + SelectionKey k = c.register(selector, SelectionKey.OP_READ, session); + handler.handleConnect(this, c, session); } else { assert channel instanceof SocketChannel; SocketChannel ch = (SocketChannel) channel; srcSa = (InetSocketAddress) ch.socket() .getRemoteSocketAddress(); - handler.handleRead(this, srcSa, null); int numRead = ch.read(readBuf); if (numRead == -1) { // Remote entity shut the socket down cleanly. Do @@ -134,7 +141,9 @@ key.channel().close(); key.cancel(); } - // TODO also handle numRead == 0 + if (numRead == 0) + break loop; + handler.handleRead(this, srcSa, readBuf); } break; } @@ -158,7 +167,7 @@ } if (srcSa == null) { - break; + break loop; } // after channel wrote to buf, set lim = pos, then pos = 0 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 16:44:32
|
Revision: 998 http://assorted.svn.sourceforge.net/assorted/?rev=998&view=rev Author: yangzhang Date: 2008-10-07 16:44:24 +0000 (Tue, 07 Oct 2008) Log Message: ----------- Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Added Paths: ----------- java-reactor/trunk/src/reactor/AbstractHandler.java Added: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-07 16:44:24 UTC (rev 998) @@ -0,0 +1,25 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * Provides default (no-op) handler methods for all events. + * + * @author yang + * + */ +public class AbstractHandler implements ReactorHandler { + + @Override + public void handleConnect(Session listenerSession, SocketChannel src, + Session clientSession) { + } + + @Override + public void handleRead(Session session, InetSocketAddress src, + ByteBuffer buf) { + } + +} Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 16:44:24 UTC (rev 998) @@ -45,7 +45,6 @@ InetSocketAddress localSa, ReactorHandler handler) { Session session = new Session(type, remoteSa, localSa, handler, selector); - sessions.add(session); return session; } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-07 16:44:24 UTC (rev 998) @@ -2,19 +2,19 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; /** - * Handler for events pertaining to a socket (session). Currently there is only - * a single event, which is the reception of a packet. - * + * Handler for events pertaining to a socket (session). + * * @author yang - * + * */ public interface ReactorHandler { /** * Handle a received packet. - * + * * @param session * The session (socket) at which the packet was received. * @param src @@ -22,6 +22,20 @@ * @param buf * The received packet. */ - public void handle(Session session, InetSocketAddress src, ByteBuffer buf); + public void handleRead(Session session, InetSocketAddress src, + ByteBuffer buf); + /** + * Handle a new TCP connection. + * + * @param listenerSession + * The session (socket) of the write. + * @param channel + * The client channel. + * @param clientSession + * The client session. + */ + public void handleConnect(Session listenerSession, SocketChannel channel, + Session clientSession); + } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 16:44:24 UTC (rev 998) @@ -49,8 +49,8 @@ serverSa = new InetSocketAddress(localhost, serverPort); clientSa = new InetSocketAddress(localhost, clientPort); - final ReactorHandler handler = new ReactorHandler() { - public void handle(Session service, InetSocketAddress src, + final ReactorHandler handler = new AbstractHandler() { + public void handleRead(Session service, InetSocketAddress src, ByteBuffer buf) { System.out.println("received: " + buf); } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 08:37:43 UTC (rev 997) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 16:44:24 UTC (rev 998) @@ -3,11 +3,15 @@ import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -48,23 +52,55 @@ this.type = type; try { - DatagramChannel channel = DatagramChannel.open(); - channel.configureBlocking(false); - DatagramSocket socket = channel.socket(); - socket.setReuseAddress(true); - if (localSa != null) - socket.bind(localSa); - if (remoteSa != null) - channel.connect(remoteSa); + switch (type) { + case STREAM: + if (localSa != null && remoteSa == null) { + ServerSocketChannel ch = ServerSocketChannel.open(); + ch.configureBlocking(false); + ServerSocket socket = ch.socket(); + socket.setReuseAddress(true); + socket.bind(localSa); + ch.register(selector, SelectionKey.OP_ACCEPT, this); + channel = ch; + } else { + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + Socket socket = ch.socket(); + socket.connect(remoteSa); + ch.register(selector, SelectionKey.OP_CONNECT, this); + channel = ch; + } + break; + case DATAGRAM: { + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(false); + DatagramSocket socket = ch.socket(); + socket.setReuseAddress(true); + if (localSa != null) + socket.bind(localSa); + if (remoteSa != null) + ch.connect(remoteSa); - channel.register(selector, SelectionKey.OP_READ, this); + ch.register(selector, SelectionKey.OP_READ, this); - this.channel = channel; + channel = ch; + break; + } + default: + throw new AssertionError("unhandled session type"); + } } catch (Exception ex) { throw new RuntimeException(ex); } } + Session(ReactorHandler handler, SocketChannel channel) { + this.handler = handler; + this.remoteSa = null; + this.channel = channel; + this.type = SessionType.STREAM; + } + /** * This is called by the reactor when there is a message to be received. * Reading messages is the priority, so this is done before anything else. @@ -80,13 +116,26 @@ final InetSocketAddress srcSa; switch (type) { - default: - srcSa = null; - break; case STREAM: { - // XXX - // ServerSocketChannel ch = (ServerSocketChannel) channel; - srcSa = null; + if (channel instanceof ServerSocketChannel) { + ServerSocketChannel ch = (ServerSocketChannel) channel; + SocketChannel c = ch.accept(); + handler.handleConnect(this, c, new Session(handler, c)); + } else { + assert channel instanceof SocketChannel; + SocketChannel ch = (SocketChannel) channel; + srcSa = (InetSocketAddress) ch.socket() + .getRemoteSocketAddress(); + handler.handleRead(this, srcSa, null); + int numRead = ch.read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + key.channel().close(); + key.cancel(); + } + // TODO also handle numRead == 0 + } break; } case DATAGRAM: { @@ -107,20 +156,22 @@ // TODO also handle numRead == 0 srcSa = remoteSa; } + + if (srcSa == null) { + break; + } + + // after channel wrote to buf, set lim = pos, then pos = 0 + readBuf.flip(); + // callback + handler.handleRead(this, srcSa, readBuf); + // recycle buffer + readBuf.clear(); break; } + default: + throw new AssertionError("unhandled session type"); } - - if (srcSa == null) { - break; - } - - // after channel wrote to buf, set lim = pos, then pos = 0 - readBuf.flip(); - // callback - handler.handle(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. @@ -192,8 +243,7 @@ bytes = ((DatagramChannel) channel).send(writeBuf, dst); break; default: - bytes = 0; - break; + throw new AssertionError("unhandled session type"); } // TODO: don't trigger an assertion failure! assert bytes == writeBuf.limit(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 08:41:59
|
Revision: 996 http://assorted.svn.sourceforge.net/assorted/?rev=996&view=rev Author: yangzhang Date: 2008-10-07 08:37:33 +0000 (Tue, 07 Oct 2008) Log Message: ----------- start documenting changes for next version Modified Paths: -------------- python-commons/trunk/README Modified: python-commons/trunk/README =================================================================== --- python-commons/trunk/README 2008-10-07 08:35:08 UTC (rev 995) +++ python-commons/trunk/README 2008-10-07 08:37:33 UTC (rev 996) @@ -37,6 +37,11 @@ Changes ------- +version 0.6, 2008-10-?? + +- ??? (review!) +- released for [MIT 6.00 courseware](http://assorted.sf.net/mit600/) + version 0.5, 2008-05-14 - added `cp1252_to_unicode()` This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 08:40:24
|
Revision: 997 http://assorted.svn.sourceforge.net/assorted/?rev=997&view=rev Author: yangzhang Date: 2008-10-07 08:37:43 +0000 (Tue, 07 Oct 2008) Log Message: ----------- making space/preparing for introduction of stream sockets Modified Paths: -------------- java-reactor/trunk/README java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Added Paths: ----------- java-reactor/trunk/src/reactor/SessionType.java Modified: java-reactor/trunk/README =================================================================== --- java-reactor/trunk/README 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/README 2008-10-07 08:37:43 UTC (rev 997) @@ -11,8 +11,23 @@ scheduled executor tasks - these are run in-line (using timeouts to select). Aside from its simplicity, efficiency, and support for scheduled tasks, it also -has no dependencies except for Java 6. Currently only UDP is supported. +has no dependencies except for Java 6. Both TCP and UDP are supported. +Setup +----- + +Requirements: + +- [Java] 6 + +[Java]: http://java.sun.com/ + +Suggested: + +- [SimpleBuild] + +[SimpleBuild]: http://assorted.sf.net/simple-build/ + Usage ----- @@ -31,6 +46,17 @@ [SRON]: https://moo.cmcl.cs.cmu.edu/trac/scaleron [slf4j]: http://www.slf4j.org/ +Changes +------- + +version 0.2, 2008-10-08 + +- added TCP support + +version 0.1, 2008-03-03 + +- initial release; UDP only + Links ----- Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-07 08:37:43 UTC (rev 997) @@ -13,9 +13,9 @@ /** * A simple select-based reactor for event-based asynchronous IO programming. It * supports scheduling of events. - * + * * @author yang - * + * */ public class Reactor { @@ -29,28 +29,29 @@ } /** - * Register a new session (i.e. socket). Currently, remoteSa is ignored. It - * was intended for creating restricted datagram sockets, which have better - * performance but occupy a file descriptor each. - * + * Register a new session (i.e. socket). + * * @param remoteSa - * Ignored. + * If non-null, create a restricted datagram socket, which has + * better performance but occupies a file descriptor. * @param localSa - * The local address and port to send/receive messages on. + * The local address and port to send/receive messages on, or + * null for auto port selection. * @param handler * The handler for events on this socket. * @return */ - public Session register(InetSocketAddress remoteSa, + public Session register(SessionType type, InetSocketAddress remoteSa, InetSocketAddress localSa, ReactorHandler handler) { - Session session = new Session(remoteSa, localSa, handler, selector); + Session session = new Session(type, remoteSa, localSa, handler, + selector); sessions.add(session); return session; } /** * The main reactor loop. This runs until the shutdown method is called. - * + * * @throws Exception */ public void react() throws Exception { @@ -93,7 +94,7 @@ /** * Schedule a task to occur after a delay. - * + * * @param r * The Runnable to be executed. * @param delay @@ -111,7 +112,7 @@ /** * Schedule a task with a fixed delay. - * + * * @param r * The Runnable to be repeatedly executed. * @param initialDelay @@ -136,10 +137,10 @@ /** * Cancel a task. This is used as a callback from ReactorTask. Removes a * task from the tasks queue. - * + * * TODO: remove the return value and assert that the return value of the * remove call is the task itself? - * + * * @param task * The task to be canceled. * @return The result of the queue removal (either the task itself, or null Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 08:37:43 UTC (rev 997) @@ -11,99 +11,101 @@ /** * A simple test for the reactor. + * * @author yang - * + * */ public class ReactorTest { - ExecutorService e; + ExecutorService e; - public ReactorTest() { - e = Executors.newCachedThreadPool(); - } + public ReactorTest() { + e = Executors.newCachedThreadPool(); + } - public void spawn(final Runnable r) { - e.submit(new Runnable() { - public void run() { - try { - r.run(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } + public void spawn(final Runnable r) { + e.submit(new Runnable() { + public void run() { + try { + r.run(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } - private Runnable makeRunnable(final int i) { - return new Runnable() { - public void run() { - System.out.println(i); - } - }; - } + private Runnable makeRunnable(final int i) { + return new Runnable() { + public void run() { + System.out.println(i); + } + }; + } - public void test() throws Exception { - InetAddress localhost = InetAddress.getLocalHost(); - int serverPort = 11111, clientPort = 22222; - final InetSocketAddress serverSa, clientSa; - serverSa = new InetSocketAddress(localhost, serverPort); - clientSa = new InetSocketAddress(localhost, clientPort); + public void test() throws Exception { + InetAddress localhost = InetAddress.getLocalHost(); + int serverPort = 11111, clientPort = 22222; + final InetSocketAddress serverSa, clientSa; + serverSa = new InetSocketAddress(localhost, serverPort); + clientSa = new InetSocketAddress(localhost, clientPort); - final ReactorHandler handler = new ReactorHandler() { - public void handle(Session service, InetSocketAddress src, - ByteBuffer buf) { - System.out.println("received: " + buf); - } - }; + final ReactorHandler handler = new ReactorHandler() { + public void handle(Session service, InetSocketAddress src, + ByteBuffer buf) { + System.out.println("received: " + buf); + } + }; - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - r.register(null, serverSa, handler); + spawn(new Runnable() { + public void run() { + try { + Reactor r = new Reactor(); + r.register(SessionType.DATAGRAM, null, serverSa, handler); - for (int i = 0; i < 10; i++) - r.schedule(makeRunnable(i), 200 * i, - TimeUnit.MILLISECONDS); + for (int i = 0; i < 10; i++) + r.schedule(makeRunnable(i), 200 * i, + TimeUnit.MILLISECONDS); - Thread.sleep(1000); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); + Thread.sleep(1000); + r.react(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - ByteBuffer writeBuf = ByteBuffer.allocate(5); - Session s = r.register(null, clientSa, handler); - Thread.sleep(2000); - s.send(writeBuf, clientSa); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); + spawn(new Runnable() { + public void run() { + try { + Reactor r = new Reactor(); + ByteBuffer writeBuf = ByteBuffer.allocate(5); + Session s = r.register(SessionType.DATAGRAM, null, + clientSa, handler); + Thread.sleep(2000); + s.send(writeBuf, clientSa); + r.react(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); - spawn(new Runnable() { - public void run() { - try { - byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; - Thread.sleep(3000); - new DatagramSocket().send(new DatagramPacket(writeBuf, - writeBuf.length, serverSa)); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); - } + spawn(new Runnable() { + public void run() { + try { + byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; + Thread.sleep(3000); + new DatagramSocket().send(new DatagramPacket(writeBuf, + writeBuf.length, serverSa)); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); + } - public static void main(String args[]) throws Exception { - new ReactorTest().test(); - } + public static void main(String args[]) throws Exception { + new ReactorTest().test(); + } } Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-07 08:37:33 UTC (rev 996) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-07 08:37:43 UTC (rev 997) @@ -5,6 +5,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; @@ -13,37 +14,41 @@ /** * Represents a "session," which is probably a bad name for what is basically a * wrapper around a datagram socket. - * + * * @author yang - * + * */ public class Session { - private final DatagramChannel channel; + private final SelectableChannel channel; private final ReactorHandler handler; private final InetSocketAddress remoteSa; private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); + private final SessionType type; /** * Construct a new Session object. - * + * * @param remoteSa - * Ignored. + * If non-null, create a restricted datagram socket, which has + * better performance but occupies a file descriptor. * @param localSa - * The local socket address on which to send/receive packets. + * The local address and port to send/receive messages on, or + * null for auto port selection. * @param handler * The handler for events on this socket. * @param selector * The selector that is used in the current reactor. */ - Session(InetSocketAddress remoteSa, InetSocketAddress localSa, - ReactorHandler handler, Selector selector) { + Session(SessionType type, InetSocketAddress remoteSa, + InetSocketAddress localSa, ReactorHandler handler, Selector selector) { this.handler = handler; this.remoteSa = remoteSa; + this.type = type; try { - channel = DatagramChannel.open(); + DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); DatagramSocket socket = channel.socket(); socket.setReuseAddress(true); @@ -53,6 +58,8 @@ channel.connect(remoteSa); channel.register(selector, SelectionKey.OP_READ, this); + + this.channel = channel; } catch (Exception ex) { throw new RuntimeException(ex); } @@ -61,7 +68,7 @@ /** * This is called by the reactor when there is a message to be received. * Reading messages is the priority, so this is done before anything else. - * + * * @param key * The key into the selector's socket set representing our * socket. @@ -70,24 +77,39 @@ void read(SelectionKey key) throws Exception { while (true) { try { - InetSocketAddress srcSa; + final InetSocketAddress srcSa; - if (remoteSa == null) { - srcSa = (InetSocketAddress) channel.receive(readBuf); - } else { - int numRead = channel.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down - // cleanly. - // Do - // the same from our end and cancel the - // channel. - key.channel().close(); - key.cancel(); + switch (type) { + default: + srcSa = null; + break; + case STREAM: { + // XXX + // ServerSocketChannel ch = (ServerSocketChannel) channel; + srcSa = null; + break; + } + case DATAGRAM: { + DatagramChannel ch = (DatagramChannel) channel; + if (remoteSa == null) { + srcSa = (InetSocketAddress) ch.receive(readBuf); + } else { + int numRead = ch.read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down + // cleanly. + // Do + // the same from our end and cancel the + // channel. + key.channel().close(); + key.cancel(); + } + // TODO also handle numRead == 0 + srcSa = remoteSa; } - // TODO also handle numRead == 0 - srcSa = remoteSa; + break; } + } if (srcSa == null) { break; @@ -112,7 +134,7 @@ * Handle the event where the socket is ready to be written. This is * currently not used since the send() method blindly writes immediately to * the socket - which is bad. - * + * * @param key * The key into the selector's socket set that represents our * socket. @@ -122,7 +144,14 @@ // Write until there's not more data ... while (!pendingWrites.isEmpty()) { ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - channel.write(buf); + switch (type) { + case STREAM: + // XXX + break; + case DATAGRAM: + ((DatagramChannel) channel).write(buf); + break; + } if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; @@ -144,7 +173,7 @@ * Send messages from this socket. This does not enqueue anything onto the * internal pendingWrites buffer if the socket's buffer is full, but instead * triggers an assertion failure. - * + * * @param writeBuf * The packet to be sent. * @param dst @@ -153,13 +182,26 @@ */ public void send(ByteBuffer writeBuf, InetSocketAddress dst) throws Exception { - int bytes = channel.send(writeBuf, dst); + final int bytes; + switch (type) { + case STREAM: + // XXX + bytes = 0; + break; + case DATAGRAM: + bytes = ((DatagramChannel) channel).send(writeBuf, dst); + break; + default: + bytes = 0; + break; + } + // TODO: don't trigger an assertion failure! assert bytes == writeBuf.limit(); } /** * Close the underlying socket. - * + * * @throws Exception */ public void close() throws Exception { Added: java-reactor/trunk/src/reactor/SessionType.java =================================================================== --- java-reactor/trunk/src/reactor/SessionType.java (rev 0) +++ java-reactor/trunk/src/reactor/SessionType.java 2008-10-07 08:37:43 UTC (rev 997) @@ -0,0 +1,7 @@ +package reactor; + +public enum SessionType { + + DATAGRAM, STREAM + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 08:36:14
|
Revision: 995 http://assorted.svn.sourceforge.net/assorted/?rev=995&view=rev Author: yangzhang Date: 2008-10-07 08:35:08 +0000 (Tue, 07 Oct 2008) Log Message: ----------- Requirement -> Suggested Modified Paths: -------------- scala-commons/trunk/README Modified: scala-commons/trunk/README =================================================================== --- scala-commons/trunk/README 2008-10-07 07:50:23 UTC (rev 994) +++ scala-commons/trunk/README 2008-10-07 08:35:08 UTC (rev 995) @@ -31,6 +31,9 @@ Requirements: - [Scala] 2.7.2-RC2 + +Suggested: + - [SimpleBuild] [SimpleBuild]: http://assorted.sf.net/simple-build/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 07:50:33
|
Revision: 994 http://assorted.svn.sourceforge.net/assorted/?rev=994&view=rev Author: yangzhang Date: 2008-10-07 07:50:23 +0000 (Tue, 07 Oct 2008) Log Message: ----------- updating for scala 2.7.2 Modified Paths: -------------- scala-commons/trunk/README scala-commons/trunk/publish.bash scala-commons/trunk/src/commons/Misc.scala Modified: scala-commons/trunk/README =================================================================== --- scala-commons/trunk/README 2008-10-07 01:58:49 UTC (rev 993) +++ scala-commons/trunk/README 2008-10-07 07:50:23 UTC (rev 994) @@ -25,6 +25,16 @@ [JScrape]: http://apsquared.net/JScrape.html [gnuplot]: http://www.gnuplot.info/ +Setup +----- + +Requirements: + +- [Scala] 2.7.2-RC2 +- [SimpleBuild] + +[SimpleBuild]: http://assorted.sf.net/simple-build/ + Related work ------------ Modified: scala-commons/trunk/publish.bash =================================================================== --- scala-commons/trunk/publish.bash 2008-10-07 01:58:49 UTC (rev 993) +++ scala-commons/trunk/publish.bash 2008-10-07 07:50:23 UTC (rev 994) @@ -7,7 +7,7 @@ } fullname='Scala Commons' -version=0.1 +version=0.2 license=scala websrcs=( README ) webfiles=( src/doc ) Modified: scala-commons/trunk/src/commons/Misc.scala =================================================================== --- scala-commons/trunk/src/commons/Misc.scala 2008-10-07 01:58:49 UTC (rev 993) +++ scala-commons/trunk/src/commons/Misc.scala 2008-10-07 07:50:23 UTC (rev 994) @@ -60,10 +60,6 @@ */ def !==(that: String) = !(===(that)) /** - * Repeat the string n times. - */ - def *(n: Int) = repeat(s) take n mkString ("", "", "") - /** * Truncate a string to be of length n. */ def truncate(n: Int) = s substring (0, s.length - n) @@ -75,7 +71,7 @@ /** * String interpolator. */ - def format(a: Array[AnyRef]) = String.format(s, a) + def format(a: Array[AnyRef]) = String.format(s, a:_*) } implicit def str2xstr(s: String): XString = XString(s) implicit def xstr2str(s: XString): String = s.s This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-07 01:58:59
|
Revision: 993 http://assorted.svn.sourceforge.net/assorted/?rev=993&view=rev Author: yangzhang Date: 2008-10-07 01:58:49 +0000 (Tue, 07 Oct 2008) Log Message: ----------- added note deprecating scala doc search (see vscaladoc) Modified Paths: -------------- scala-doc-search/trunk/README Modified: scala-doc-search/trunk/README =================================================================== --- scala-doc-search/trunk/README 2008-10-06 22:25:11 UTC (rev 992) +++ scala-doc-search/trunk/README 2008-10-07 01:58:49 UTC (rev 993) @@ -1,3 +1,10 @@ +Status +------ + +This tool was made obsolete by [vscaladoc], which includes search capabilities. + +[vscaladoc]: http://code.google.com/p/vscaladoc/ + Overview -------- This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-06 22:25:21
|
Revision: 992 http://assorted.svn.sourceforge.net/assorted/?rev=992&view=rev Author: yangzhang Date: 2008-10-06 22:25:11 +0000 (Mon, 06 Oct 2008) Log Message: ----------- added network IO benchmark Added Paths: ----------- netio-bench/ netio-bench/trunk/ netio-bench/trunk/src/ netio-bench/trunk/src/Makefile netio-bench/trunk/src/epoll.cc Added: netio-bench/trunk/src/Makefile =================================================================== --- netio-bench/trunk/src/Makefile (rev 0) +++ netio-bench/trunk/src/Makefile 2008-10-06 22:25:11 UTC (rev 992) @@ -0,0 +1,9 @@ +all: epoll + +epoll: epoll.cc + g++ -I../../../cpp-commons/trunk/src/ -Wall -O3 -o $@ $< + +clean: + rm -f epoll + +.PHONY: clean Added: netio-bench/trunk/src/epoll.cc =================================================================== --- netio-bench/trunk/src/epoll.cc (rev 0) +++ netio-bench/trunk/src/epoll.cc 2008-10-06 22:25:11 UTC (rev 992) @@ -0,0 +1,106 @@ +#include <fcntl.h> +#include <stdio.h> +#include <sys/epoll.h> +#include <unistd.h> + +#include <iostream> + +#include <commons/check.h> +#include <commons/closing.h> +#include <commons/sockets.h> + +using namespace commons; +using namespace std; + +/** + * Read data from the given file descriptor until we would block (EAGAIN) or we + * hit EOF/an error. + * \return true if we hit EAGAIN, false on EOF or unexpected error. + */ +static bool +consume(int fd) { + while (true) { + char buf[1024]; + int bytes = read(fd, buf, sizeof buf); + if (bytes == -1) { + if (errno == EAGAIN) { + return true; + } else { + perror("read"); + return false; + } + } + if (bytes == 0) { + return false; + } + + // Write the data to stdout + checknneg(write(1, buf, bytes) == -1); + } +} + +int +main(int argc, char* argv[]) { + int server = tcp_listen(8080, true); + + // Make sure the fd is finally closed. + closingfd closer(server); + + // Create our epoll file descriptor + const int max_events = 16; + int epoll_fd = checknneg(epoll_create(max_events)); + + // Add our server fd to the epoll event loop + struct epoll_event event; + event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; + event.data.fd = server; + checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server, &event)); + + // Execute the epoll event loop + while (true) { + struct epoll_event events[max_events]; + int num_fds = epoll_wait(epoll_fd, events, max_events, -1); + + for (int i = 0; i < num_fds; i++) { + // Case 1: Error condition + if (events[i].events & (EPOLLHUP | EPOLLERR)) { + fputs("epoll: EPOLLERR", stderr); + close(events[i].data.fd); + continue; + } + check(events[i].events & EPOLLIN); + + // Case 2: Our server is receiving a connection + if (events[i].data.fd == server) { + struct sockaddr remote_addr; + socklen_t addr_size = sizeof(remote_addr); + int connection = accept(server, &remote_addr, &addr_size); + if (connection == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + perror("accept"); + } + continue; + } + + // Make the connection non-blocking + checknneg(fcntl(connection, F_SETFL, + O_NONBLOCK | fcntl(connection, F_GETFL, 0))); + + // Add the connection to our epoll loop + event.data.fd = connection; + checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connection, + &event)); + continue; + } + + // Case 3: One of our connections has read data + if (!consume(events[i].data.fd)) { + // epoll will remove the fd from its set + // automatically when the fd is closed + close(events[i].data.fd); + } + } + } + + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-06 22:24:23
|
Revision: 991 http://assorted.svn.sourceforge.net/assorted/?rev=991&view=rev Author: yangzhang Date: 2008-10-06 22:24:13 +0000 (Mon, 06 Oct 2008) Log Message: ----------- added closingfd; reworked tcp_listen into tcp_listen (with non-blocking option) and server_socket Modified Paths: -------------- cpp-commons/trunk/src/commons/closing.h cpp-commons/trunk/src/commons/sockets.h Modified: cpp-commons/trunk/src/commons/closing.h =================================================================== --- cpp-commons/trunk/src/commons/closing.h 2008-10-05 18:44:39 UTC (rev 990) +++ cpp-commons/trunk/src/commons/closing.h 2008-10-06 22:24:13 UTC (rev 991) @@ -14,6 +14,11 @@ T x; }; + class closingfd : closing<int> { + public: + closingfd(int x) : closing<int>(x) {} + }; + } #endif Modified: cpp-commons/trunk/src/commons/sockets.h =================================================================== --- cpp-commons/trunk/src/commons/sockets.h 2008-10-05 18:44:39 UTC (rev 990) +++ cpp-commons/trunk/src/commons/sockets.h 2008-10-06 22:24:13 UTC (rev 991) @@ -14,17 +14,28 @@ { /** - * Create a listener socket, with SO_REUSEADDR. + * Create a server socket bound to localhost, with SO_REUSEADDR enabled. * \param[in] port The port to listen on. - * \return The listener socket. + * \param[in] nb Whether the socket should be non-blocking. + * \return The server socket. */ int - tcp_listen(int port) + server_socket(int port, bool nb = false) { // Create the socket. - int sfd = checknneg(socket(PF_INET, SOCK_STREAM, 0)); + int fd = checknneg(socket(PF_INET, SOCK_STREAM, 0)); try { + // Configure the socket. + int n = 1; + check0x(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, + sizeof(n))); + + // Make our socket non-blocking if desired. + if (nb) { + checknneg(fcntl(fd, F_SETFL, O_NONBLOCK | fcntl(fd, F_GETFL, 0))); + } + // Create the local socket address. struct sockaddr_in sa; bzero(&sa, sizeof(sa)); @@ -32,19 +43,33 @@ sa.sin_port = htons(port); sa.sin_addr.s_addr = htonl(INADDR_ANY); - // Configure the socket. - int n = 1; - check0x(setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n))); - // Bind the socket to the local socket address. - check0x(bind(sfd, (struct sockaddr*) &sa, sizeof(struct sockaddr_in))); + check0x(bind(fd, (struct sockaddr*) &sa, sizeof(struct sockaddr_in))); - // Start listening. - check0x(listen(sfd, 256)); + return fd; + } catch (...) { + close(fd); + throw; + } + } - return sfd; + /** + * Create a server socket and listen on it. + * \param[in] port The port to listen on. + * \param[in] nb Whether the socket should be non-blocking. + * \return The listener socket. + */ + int + tcp_listen(int port, bool nb = false) + { + int fd = server_socket(port, nb); + try { + // SOMAXCONN is the kernel's limit on the maximum number of socket + // connections. + check0x(listen(fd, SOMAXCONN)); + return fd; } catch (...) { - close(sfd); + close(fd); throw; } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-05 18:46:14
|
Revision: 990 http://assorted.svn.sourceforge.net/assorted/?rev=990&view=rev Author: yangzhang Date: 2008-10-05 18:44:39 +0000 (Sun, 05 Oct 2008) Log Message: ----------- added pidgin and xchat log backups Modified Paths: -------------- configs/trunk/src/cron/backup.bash Modified: configs/trunk/src/cron/backup.bash =================================================================== --- configs/trunk/src/cron/backup.bash 2008-10-05 18:44:13 UTC (rev 989) +++ configs/trunk/src/cron/backup.bash 2008-10-05 18:44:39 UTC (rev 990) @@ -14,4 +14,6 @@ eval `keychain --eval --nogui id_dsa 2> /dev/null` export PASSPHRASE="$( cat ~/.backup.auth )" -exec duplicity $args ~/personal/ scp://hv//export/home/yang/backup-zs.ath.cx +duplicity $args ~/personal/ scp://hv//export/home/yang/backup-zs.ath.cx +duplicity $args ~/.purple/ scp://hv//export/home/yang/purple-zs.ath.cx +duplicity $args ~/.xchat2/ scp://hv//export/home/yang/xchat-zs.ath.cx This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-05 18:46:13
|
Revision: 988 http://assorted.svn.sourceforge.net/assorted/?rev=988&view=rev Author: yangzhang Date: 2008-10-05 18:42:47 +0000 (Sun, 05 Oct 2008) Log Message: ----------- removed shebang from lib Modified Paths: -------------- simple-zdb/trunk/src/zdb.py Modified: simple-zdb/trunk/src/zdb.py =================================================================== --- simple-zdb/trunk/src/zdb.py 2008-10-05 18:42:32 UTC (rev 987) +++ simple-zdb/trunk/src/zdb.py 2008-10-05 18:42:47 UTC (rev 988) @@ -1,5 +1,3 @@ -#!/usr/bin/env python - from __future__ import with_statement from cPickle import load, dump, dumps This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-05 18:45:14
|
Revision: 989 http://assorted.svn.sourceforge.net/assorted/?rev=989&view=rev Author: yangzhang Date: 2008-10-05 18:44:13 +0000 (Sun, 05 Oct 2008) Log Message: ----------- updated active/passive info; added mit-tools Modified Paths: -------------- assorted-site/trunk/index.txt Modified: assorted-site/trunk/index.txt =================================================================== --- assorted-site/trunk/index.txt 2008-10-05 18:42:47 UTC (rev 988) +++ assorted-site/trunk/index.txt 2008-10-05 18:44:13 UTC (rev 989) @@ -12,17 +12,18 @@ - ZDB: object database with an emphasis on semantics (passive) - General-purpose libraries ("commons") for various languages or platforms - - [Python Commons](python-commons) (passive) + - [Python Commons](python-commons) (active) - [Scala Commons](scala-commons) (passive) - [Java Reactor](java-reactor): simple event loop for single-threaded asynchronous IO and task scheduling (done) - - [C++ Commons](cpp-commons) (active) + - [C++ Commons](cpp-commons) (passive) - Haskell Commons (hiatus) - TeX Commons (hiatus) - [Shell Tools](shell-tools): programs written in a variety of languages and oriented toward shell scripting and system administration (passive) - [AFX](python-afx): extensions (e.g. threading support) for the AF asynchronous programming framework (passive) + - MIT Tools: scripts for working in MIT infrastructure (Athena, SIPB, etc.) - UI libraries - Scala TUI: a declarative reactive programming toolkit for constructing [ncurses]-based text user interfaces (hiatus) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-05 18:45:12
|
Revision: 987 http://assorted.svn.sourceforge.net/assorted/?rev=987&view=rev Author: yangzhang Date: 2008-10-05 18:42:32 +0000 (Sun, 05 Oct 2008) Log Message: ----------- updated documentation Modified Paths: -------------- sharing-gateway/trunk/README Modified: sharing-gateway/trunk/README =================================================================== --- sharing-gateway/trunk/README 2008-10-05 17:17:01 UTC (rev 986) +++ sharing-gateway/trunk/README 2008-10-05 18:42:32 UTC (rev 987) @@ -9,12 +9,20 @@ but: - uses a simple YAML configuration file format +- allows usage of higher-level tools than `mount`, such as `sshfs` - can handle hostnames instead of IPs for CIFS shares -- can create the mountpoint directories +- can create and remove the mountpoint directories The rest of this is mostly documentation on how to configure your own servers for the gateway. +Explicitly supported backends: + +- bind +- curlftpfs +- sshfs +- cifs + Setup ----- @@ -30,6 +38,17 @@ [Python]: http://python.org/ [Python YAML]: http://pyyaml.org/ +### Backends + +The following are conditionally required, depending on what kind of file +systems you want to mount. + +- [sshfs] +- [curlftpfs] + +[sshfs]: http://fuse.sourceforge.net/sshfs.html +[curlftpfs]: http://curlftpfs.sourceforge.net/ + ### Web Frontend Requirements: @@ -173,3 +192,17 @@ - uploads The FTP server we'll use is ProFTPD. + +Tips +---- + +- When using a SSHFS, you may have a key pair that you use for authentication, + so that with ssh-agent you can get (nearly) passwordless logins. However, to + allow the root user to mount this SSHFS using your credentials, you can still + get a passwordless without running ssh-agent for root by creating a + passwordless (insecure) copy of your private key. This is done above in the + instructions for preparing certificates for HTTP/SSL (shown here for either + RSA or DSA): + + openssl rsa -in id_rsa -out insecure_id_rsa + openssl dsa -in id_dsa -out insecure_id_dsa This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-05 18:26:33
|
Revision: 985 http://assorted.svn.sourceforge.net/assorted/?rev=985&view=rev Author: yangzhang Date: 2008-10-05 17:16:10 +0000 (Sun, 05 Oct 2008) Log Message: ----------- added support for sshfs, bind; better err msgs; tweaked documentation Modified Paths: -------------- sharing-gateway/trunk/src/gateway.py sharing-gateway/trunk/src/index.pandoc Modified: sharing-gateway/trunk/src/gateway.py =================================================================== --- sharing-gateway/trunk/src/gateway.py 2008-10-02 22:18:03 UTC (rev 984) +++ sharing-gateway/trunk/src/gateway.py 2008-10-05 17:16:10 UTC (rev 985) @@ -21,13 +21,13 @@ class my_exception( Exception ): pass -def run( cmd ): - p = Popen( cmd ) +def sudo( cmd ): + p = Popen( ['sudo'] + cmd ) if p.wait() != 0: raise my_exception( 'command failed with error code %s: %s' % ( p.returncode, cmd ) ) def runout( cmd, f ): - p = Popen( cmd, stdout = f ) + p = Popen( ['sudo'] + cmd, stdout = f ) if p.wait() != 0: raise my_exception( 'command failed with error code %s: %s' % ( p.returncode, cmd ) ) @@ -45,26 +45,38 @@ if cmd == 'mount': for s in cfg.shares: - if s.type == 'cifs': + try: + mountpt = mountdir / s.name + sudo( [ 'mkdir', '-p', mountpt ] ) d = structs2dicts( s ) - d[ 'ip' ] = gethostbyname( s.host ) - mountpt = mountdir / s.name - try: - run( [ 'sudo', 'mkdir', '-p', mountpt ] ) - run( 'sudo mount -t cifs -o'.split() + + if s.type == 'cifs': + d[ 'ip' ] = gethostbyname( s.host ) + sudo( 'mount -t cifs -o'.split() + [ 'user=%(user)s,pass=%(pass)s,ip=%(ip)s' % d, s.share, mountpt ] ) - except: - print >> stderr, 'error mounting', s.name - else: - raise my_exception( 'unknown share type: %s' % s.type ) + elif s.type == 'sshfs': + sudo( [ 'sshfs', s.share, mountpt, + '-o', 'IdentityFile=%(key)s' % d, '-o', 'allow_other' ] ) + elif s.type == 'bind': + sudo( [ 'mount', '--bind', s.share, mountpt ] ) + else: + raise my_exception( 'unknown share type: %s' % s.type ) + except Exception, ex: + print >> stderr, 'error mounting', s.name + print >> stderr, ex elif cmd == 'umount': for s in cfg.shares: mountpt = mountdir / s.name try: - run( [ 'sudo', 'umount', mountpt ] ) - run( [ 'sudo', 'rmdir', mountpt ] ) - except: + if s.type in 'cifs bind'.split(): + sudo( [ 'umount', mountpt ] ) + elif s.type == 'sshfs': + sudo( [ 'fusermount', '-u', mountpt ] ) + else: + raise my_exception( 'unknown share type: %s' % s.type ) + sudo( [ 'rmdir', mountpt ] ) + except Exception, ex: print >> stderr, 'error unmounting', s.name + print >> stderr, ex elif cmd == 'index': for s in cfg.shares: print 'indexing', s.name Modified: sharing-gateway/trunk/src/index.pandoc =================================================================== --- sharing-gateway/trunk/src/index.pandoc 2008-10-02 22:18:03 UTC (rev 984) +++ sharing-gateway/trunk/src/index.pandoc 2008-10-05 17:16:10 UTC (rev 985) @@ -23,8 +23,8 @@ Each of the top-level directories represents a different share, so their speeds and availability may vary. -About ------ +About Sharing Gateway +--------------------- To find more information about this system or to run your own sharing gateway, take a look at the [project homepage]. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-05 17:18:07
|
Revision: 986 http://assorted.svn.sourceforge.net/assorted/?rev=986&view=rev Author: yangzhang Date: 2008-10-05 17:17:01 +0000 (Sun, 05 Oct 2008) Log Message: ----------- added dicts2structs, structs2dicts, tests for these; improved struct ctor Modified Paths: -------------- python-commons/trunk/src/commons/structs.py Modified: python-commons/trunk/src/commons/structs.py =================================================================== --- python-commons/trunk/src/commons/structs.py 2008-10-05 17:16:10 UTC (rev 985) +++ python-commons/trunk/src/commons/structs.py 2008-10-05 17:17:01 UTC (rev 986) @@ -5,7 +5,7 @@ Data structures: Heaps, lists, queues, and Python hacks. """ -import copy, heapq, itertools, sys +import copy, heapq, itertools, sys, unittest class bidict( object ): """Bi-directional dictionary; assumes 1:1 mappings.""" @@ -44,7 +44,8 @@ """ General-purpose namespace structure. """ - def __init__( self, **args ): + def __init__( self, d = {}, **args ): + self.__dict__.update( d ) self.__dict__.update( args ) def __repr__( self ): fields = ( '%s = %r' % ( name, value ) @@ -525,3 +526,56 @@ if self._key is not None: item = item[1] return item + +def dicts2structs(x): + """ + Given a tree of lists/dicts, perform a deep traversal to transform all the + dicts to structs. + """ + if type(x) == dict: + return free_struct( ( k, dicts2structs(v) ) for k,v in x.iteritems()) + elif type(x) == list: + return [dicts2structs(v) for v in x] + else: + return x + +def structs2dicts(x): + """ + Given a tree of lists/structs, perform a deep traversal to transform all + the structs to dicts. + """ + if type(x) == free_struct: + return dict( ( k, structs2dicts(v) ) for k,v in x.__dict__.iteritems() ) + elif type(x) == list: + return [structs2dicts(v) for v in x] + else: + return x + +# +# Tests. +# + +class common_tests(unittest.TestCase): + def test_dicts_structs(self): + dicts = { + 'atom': 0, + 'dict': { 'atom': 'atom', 'list': [1,2,3] }, + 'list': [ 'atom', {'key': 'value'} ] + } + + structs = dicts2structs(dicts) + self.assertEqual(structs.atom, dicts['atom']) + self.assertEqual(structs.dict.atom, dicts['dict']['atom']) + self.assertEqual(structs.dict.list, dicts['dict']['list']) + self.assertEqual(structs.list[0], dicts['list'][0]) + self.assertEqual(structs.list[1].key, dicts['list'][1]['key']) + + dicts2 = structs2dicts(dicts) + self.assertEqual(dicts2['atom'], structs.atom) + self.assertEqual(dicts2['dict']['atom'], structs.dict.atom) + self.assertEqual(dicts2['dict']['list'], structs.dict.list) + self.assertEqual(dicts2['list'][0], structs.list[0]) + self.assertEqual(dicts2['list'][1]['key'], structs.list[1].key) + +if __name__ == '__main__': + unittest.main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 23:22:13
|
Revision: 984 http://assorted.svn.sourceforge.net/assorted/?rev=984&view=rev Author: yangzhang Date: 2008-10-02 22:18:03 +0000 (Thu, 02 Oct 2008) Log Message: ----------- reorganized/cleaned; fixed some minor bugs; added sendmail Modified Paths: -------------- python-commons/trunk/src/commons/misc.py Modified: python-commons/trunk/src/commons/misc.py =================================================================== --- python-commons/trunk/src/commons/misc.py 2008-10-02 19:31:12 UTC (rev 983) +++ python-commons/trunk/src/commons/misc.py 2008-10-02 22:18:03 UTC (rev 984) @@ -6,27 +6,50 @@ """ __all__ = ''' +TerminalController days +default_if_none generate_bit_fields +remove_colors +run +sendmail +seq +settimeout +tc +timeout_exception wall_clock -default_if_none -seq -run -TerminalController -remove_colors wrap_color '''.split() -from contextlib import * -from subprocess import CalledProcessError, PIPE, Popen -from time import * +# +# Email +# + +import smtplib + +def sendmail(sender, to, subj, body): + msg = "From: %s\r\nTo: %s\r\nSubject:%s\r\n\r\n%s" % (sender, to, subj, body) + server = smtplib.SMTP('localhost') + try: + server.set_debuglevel(False) + server.sendmail(sender, to, msg) + finally: + server.quit() + +# +# Date/Time +# + from datetime import timedelta -import sys, re def days(td): """Returns the ceil(days in the timedelta L{td}).""" return td.days + (1 if td - timedelta(days = td.days) > timedelta() else 0) +# +# Bit fields +# + def generate_bit_fields(count): """ A generator of [2^i] for i from 0 to (count - 1). Useful for, @@ -44,6 +67,14 @@ yield j j <<= 1 +# +# Timing +# + +from time import * +from contextlib import * +from signal import * + @contextmanager def wall_clock(output): """ @@ -66,6 +97,21 @@ end = time() output[0] = end - start +class timeout_exception(Exception): pass + +def settimeout(secs): + """ + Set the signal handler for SIGALRM to raise timeout_exception, and call + alarm(secs). + """ + def handle(sig, frame): raise timeout_exception() + signal(SIGALRM, handle) + alarm(secs) + +# +# Functional +# + def default_if_none(x, d): """ Returns L{x} if it's not None, otherwise returns L{d}. @@ -80,17 +126,30 @@ f() return g() +# +# Processes +# + +from subprocess import CalledProcessError, PIPE, Popen + def run(cmd): """ Run the given command (a list of program and argument strings) and return the stdout as a string, raising a L{CalledProcessError} if the program exited - with a non-zero status. + with a non-zero status. Different from check_call because I return the + (piped) stdout. """ p = Popen(cmd, stdout=PIPE) stdout = p.communicate()[0] if p.returncode != 0: raise CalledProcessError(p.returncode, cmd) return stdout +# +# Terminal ANSI coloring +# + +import sys, re + class TerminalController: """ From U{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/475116}. @@ -243,11 +302,12 @@ 'Removes ANSI escape codes (e.g. those for terminal colors).' return remove_colors_pat.sub('', s) -def wrap_color(s, color): +tc = TerminalController() +def wrap_color(s, color, tc = tc): """ Wraps L{s} in L{color} (resets color to NORMAL at the end). """ - return '${%s}%s${NORMAL}' % (s, color) + return tc.render( '${%s}%s${NORMAL}' % (color, s) ) import unittest This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 19:31:19
|
Revision: 983 http://assorted.svn.sourceforge.net/assorted/?rev=983&view=rev Author: yangzhang Date: 2008-10-02 19:31:12 +0000 (Thu, 02 Oct 2008) Log Message: ----------- added explicit exports to a few modules Modified Paths: -------------- python-commons/trunk/src/commons/files.py python-commons/trunk/src/commons/misc.py python-commons/trunk/src/commons/strs.py Modified: python-commons/trunk/src/commons/files.py =================================================================== --- python-commons/trunk/src/commons/files.py 2008-10-02 19:14:50 UTC (rev 982) +++ python-commons/trunk/src/commons/files.py 2008-10-02 19:31:12 UTC (rev 983) @@ -13,6 +13,21 @@ from __future__ import with_statement +__all__ = ''' +soft_makedirs +temp_dir +cleanse_filename +invalid_filename_chars +invalid_filename_chars_regex +disk_double_buffer +versioned_guard +versioned_cache +read_file +write_file +write_or_rm +is_nonempty_file +'''.split() + import os, re, tempfile from cPickle import * from path import path Modified: python-commons/trunk/src/commons/misc.py =================================================================== --- python-commons/trunk/src/commons/misc.py 2008-10-02 19:14:50 UTC (rev 982) +++ python-commons/trunk/src/commons/misc.py 2008-10-02 19:31:12 UTC (rev 983) @@ -5,6 +5,18 @@ Miscellanea. """ +__all__ = ''' +days +generate_bit_fields +wall_clock +default_if_none +seq +run +TerminalController +remove_colors +wrap_color +'''.split() + from contextlib import * from subprocess import CalledProcessError, PIPE, Popen from time import * Modified: python-commons/trunk/src/commons/strs.py =================================================================== --- python-commons/trunk/src/commons/strs.py 2008-10-02 19:14:50 UTC (rev 982) +++ python-commons/trunk/src/commons/strs.py 2008-10-02 19:31:12 UTC (rev 983) @@ -2,9 +2,21 @@ # vim:ft=python:et:sw=4:ts=4 """ -String formatting. +String formatting, encoding, etc. """ +__all__ = ''' +format +safe_ascii +cp1252_to_unicode_translations +cp1252_to_unicode +unwrap +indent +underline +dos2unix +unicode2html +'''.split() + import itertools, cgi, re def format( *args ): This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 19:15:09
|
Revision: 982 http://assorted.svn.sourceforge.net/assorted/?rev=982&view=rev Author: yangzhang Date: 2008-10-02 19:14:50 +0000 (Thu, 02 Oct 2008) Log Message: ----------- added indent, underline, dos2unix, unicode2html Modified Paths: -------------- python-commons/trunk/src/commons/strs.py Modified: python-commons/trunk/src/commons/strs.py =================================================================== --- python-commons/trunk/src/commons/strs.py 2008-10-02 19:14:28 UTC (rev 981) +++ python-commons/trunk/src/commons/strs.py 2008-10-02 19:14:50 UTC (rev 982) @@ -5,7 +5,7 @@ String formatting. """ -import itertools +import itertools, cgi, re def format( *args ): """Formats the args as they would be by the C{print} built-in.""" @@ -64,3 +64,28 @@ """ if isinstance(s, str): s = s.strip().split('\n') return ' '.join( line.strip() for line in s ) + +def indent(s, ind = ' '): + """ + Prefixes each line in L{s} with L{ind}. L{s} can be either a string (which + will be broken up into a list of lines) or a list of strings (treated as + lines). Returns a single (indented) string. + """ + if isinstance(s, str): s = s.split('\n') + return '\n'.join( ind + line for line in s ) + +def underline(s, sep): + """ + Appends to L{s} a newline and a number of repetitions of L{sep}; the number + of repetitions is the length of L{s}. + """ + return s + '\n' + (sep * len(s)) + +def dos2unix(s): + "Removes carriage returns." + return s.replace('\r','') + +pat = re.compile(u'[\u0080-\uffff]') +def unicode2html(s): + "Extends cgi.escape() with escapes for all unicode characters." + return pat.sub(lambda m: '&#%d;' % ord(m.group()), cgi.escape(s)) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 19:14:45
|
Revision: 981 http://assorted.svn.sourceforge.net/assorted/?rev=981&view=rev Author: yangzhang Date: 2008-10-02 19:14:28 +0000 (Thu, 02 Oct 2008) Log Message: ----------- added wrap_color, force for TerminalController, days Modified Paths: -------------- python-commons/trunk/src/commons/misc.py Modified: python-commons/trunk/src/commons/misc.py =================================================================== --- python-commons/trunk/src/commons/misc.py 2008-10-02 19:13:44 UTC (rev 980) +++ python-commons/trunk/src/commons/misc.py 2008-10-02 19:14:28 UTC (rev 981) @@ -8,10 +8,12 @@ from contextlib import * from subprocess import CalledProcessError, PIPE, Popen from time import * +from datetime import timedelta +import sys, re -# TerminalController -from sys import stdout -from re import sub +def days(td): + """Returns the ceil(days in the timedelta L{td}).""" + return td.days + (1 if td - timedelta(days = td.days) > timedelta() else 0) def generate_bit_fields(count): """ @@ -156,7 +158,7 @@ _COLORS = """BLACK BLUE GREEN CYAN RED MAGENTA YELLOW WHITE""".split() _ANSICOLORS = "BLACK RED GREEN YELLOW BLUE MAGENTA CYAN WHITE".split() - def __init__(self, term_stream=stdout, force=False): + def __init__(self, term_stream=sys.stdout, force=False): """ Create a `TerminalController` and initialize its attributes with appropriate values for the current terminal. @@ -209,7 +211,7 @@ # these, so strip them out. import curses cap = curses.tigetstr(cap_name) or '' - return sub(r'\$<\d+>[/*]?', '', cap) + return re.sub(r'\$<\d+>[/*]?', '', cap) def render(self, template): """ @@ -217,20 +219,24 @@ the corresponding terminal control string (if it's defined) or '' (if it's not). """ - return sub(r'\$\$|\${\w+}', self._render_sub, template) + return re.sub(r'\$\$|\${\w+}', self._render_sub, template) def _render_sub(self, match): s = match.group() if s == '$$': return s else: return getattr(self, s[2:-1]) -import re - remove_colors_pat = re.compile('\033\\[[0-9;]*m') def remove_colors(s): 'Removes ANSI escape codes (e.g. those for terminal colors).' return remove_colors_pat.sub('', s) +def wrap_color(s, color): + """ + Wraps L{s} in L{color} (resets color to NORMAL at the end). + """ + return '${%s}%s${NORMAL}' % (s, color) + import unittest class color_test( unittest.TestCase ): This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 19:13:51
|
Revision: 980 http://assorted.svn.sourceforge.net/assorted/?rev=980&view=rev Author: yangzhang Date: 2008-10-02 19:13:44 +0000 (Thu, 02 Oct 2008) Log Message: ----------- added read_file, write_file, write_or_rm, is_nonempty_file from 6.00 code Modified Paths: -------------- python-commons/trunk/src/commons/files.py Modified: python-commons/trunk/src/commons/files.py =================================================================== --- python-commons/trunk/src/commons/files.py 2008-10-02 18:47:14 UTC (rev 979) +++ python-commons/trunk/src/commons/files.py 2008-10-02 19:13:44 UTC (rev 980) @@ -169,3 +169,22 @@ else: # cache up-to-date (should be available since dlcs-timestamp exists!) with file(cache_path) as f: return load(f) + +def read_file(path): + f = file(path) + try: return f.read() + finally: f.close() + +def write_file(path, contents): + f = file(path,'w') + try: f.write(contents) + finally: f.close() + +def write_or_rm(p, contents): + 'Write the file or remove it if contents is empty.' + p = path(p) + if contents.strip(): write_file(p, contents) + elif p.isfile(): p.remove() + +def is_nonempty_file(path): + return path.isfile() and read_file(path).strip() != '' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 18:47:43
|
Revision: 979 http://assorted.svn.sourceforge.net/assorted/?rev=979&view=rev Author: yangzhang Date: 2008-10-02 18:47:14 +0000 (Thu, 02 Oct 2008) Log Message: ----------- added unwrap() Modified Paths: -------------- python-commons/trunk/src/commons/strs.py Modified: python-commons/trunk/src/commons/strs.py =================================================================== --- python-commons/trunk/src/commons/strs.py 2008-10-02 18:46:58 UTC (rev 978) +++ python-commons/trunk/src/commons/strs.py 2008-10-02 18:47:14 UTC (rev 979) @@ -56,3 +56,11 @@ x = x.replace(a,b) return x +def unwrap(s): + """ + Joins a bunch of lines. L{s} is either a single string (which will be + split on newlines into a list of strings) or a list of strings + (representing lines). + """ + if isinstance(s, str): s = s.strip().split('\n') + return ' '.join( line.strip() for line in s ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-02 18:47:27
|
Revision: 978 http://assorted.svn.sourceforge.net/assorted/?rev=978&view=rev Author: yangzhang Date: 2008-10-02 18:46:58 +0000 (Thu, 02 Oct 2008) Log Message: ----------- added a force option to TerminalController; added remove_colors; added unit test for colors Modified Paths: -------------- python-commons/trunk/src/commons/misc.py Modified: python-commons/trunk/src/commons/misc.py =================================================================== --- python-commons/trunk/src/commons/misc.py 2008-09-30 07:04:34 UTC (rev 977) +++ python-commons/trunk/src/commons/misc.py 2008-10-02 18:46:58 UTC (rev 978) @@ -156,7 +156,7 @@ _COLORS = """BLACK BLUE GREEN CYAN RED MAGENTA YELLOW WHITE""".split() _ANSICOLORS = "BLACK RED GREEN YELLOW BLUE MAGENTA CYAN WHITE".split() - def __init__(self, term_stream=stdout): + def __init__(self, term_stream=stdout, force=False): """ Create a `TerminalController` and initialize its attributes with appropriate values for the current terminal. @@ -169,7 +169,7 @@ except: return # If the stream isn't a tty, then assume it has no capabilities. - if not term_stream.isatty(): return + if not force and not term_stream.isatty(): return # Check the terminal type. If we fail, then assume that the # terminal has no capabilities. @@ -224,3 +224,22 @@ if s == '$$': return s else: return getattr(self, s[2:-1]) +import re + +remove_colors_pat = re.compile('\033\\[[0-9;]*m') +def remove_colors(s): + 'Removes ANSI escape codes (e.g. those for terminal colors).' + return remove_colors_pat.sub('', s) + +import unittest + +class color_test( unittest.TestCase ): + def test_round_trip( self ): + tc = TerminalController() + template = '${GREEN}green${BLUE}blue${NORMAL}normal' + rendered = tc.render( template ) + removed = remove_colors( rendered ) + self.assertEqual( removed, 'greenbluenormal' ) + +if __name__ == '__main__': + unittest.main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |