Thread: [Assorted-commits] SF.net SVN: assorted: [500] java-reactor/trunk/src/reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-25 06:54:02
|
Revision: 500 http://assorted.svn.sourceforge.net/assorted/?rev=500&view=rev Author: yangzhang Date: 2008-02-24 22:53:56 -0800 (Sun, 24 Feb 2008) Log Message: ----------- added javadoc Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTask.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-02-24 22:40:15 UTC (rev 499) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-02-25 06:53:56 UTC (rev 500) @@ -10,87 +10,152 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; - +/** + * A simple select-based reactor for event-based asynchronous IO programming. It + * supports scheduling of events. + * + * @author yang + * + */ public class Reactor { - private final Selector selector; - private final List<Session> sessions = new ArrayList<Session>(); - private boolean doShutdown = false; - private final PriorityQueue<ReactorTask> tasks = new PriorityQueue<ReactorTask>(); + private final Selector selector; + private final List<Session> sessions = new ArrayList<Session>(); + private boolean doShutdown = false; + private final PriorityQueue<ReactorTask> tasks = new PriorityQueue<ReactorTask>(); - public Reactor() throws Exception { - selector = Selector.open(); - } + public Reactor() throws Exception { + selector = Selector.open(); + } - public Session register(InetSocketAddress remoteSa, - InetSocketAddress localSa, ReactorHandler handler) { - Session session = new Session(remoteSa, localSa, handler, sessions - .size(), selector); - sessions.add(session); - return session; - } + /** + * Register a new session (i.e. socket). Currently, remoteSa is ignored. It + * was intended for creating restricted datagram sockets, which have better + * performance but occupy a file descriptor each. + * + * @param remoteSa + * Ignored. + * @param localSa + * The local address and port to send/receive messages on. + * @param handler + * The handler for events on this socket. + * @return + */ + public Session register(InetSocketAddress remoteSa, + InetSocketAddress localSa, ReactorHandler handler) { + Session session = new Session(remoteSa, localSa, handler, selector); + sessions.add(session); + return session; + } - public void react() throws Exception { - while (true) { - if (doShutdown) - break; + /** + * The main reactor loop. This runs until the shutdown method is called. + * + * @throws Exception + */ + public void react() throws Exception { + while (true) { + if (doShutdown) + break; - int updated; - if (tasks.isEmpty()) { - updated = selector.select(); - } else { - long t = tasks.peek().getDelay(TimeUnit.MILLISECONDS); - updated = t > 0 ? selector.select(t) : 0; - } + int updated; + if (tasks.isEmpty()) { + updated = selector.select(); + } else { + long t = tasks.peek().getDelay(TimeUnit.MILLISECONDS); + updated = t > 0 ? selector.select(t) : 0; + } - if (updated > 0) { - Set<SelectionKey> keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - if (key.isValid()) { - if (key.isReadable()) { - ((Session) key.attachment()).read(key); - } else if (key.isWritable()) { - ((Session) key.attachment()).write(key); - } - } - } - keys.clear(); - } else { - // TODO impose limit on # things to run at once (perhaps even - // specify costs) - while (!tasks.isEmpty() - && tasks.peek().getDelay(TimeUnit.MILLISECONDS) == 0L) { - ReactorTask task = tasks.remove(); - task.run(); - } - } - } - selector.close(); - } + if (updated > 0) { + Set<SelectionKey> keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + if (key.isValid()) { + if (key.isReadable()) { + ((Session) key.attachment()).read(key); + } else if (key.isWritable()) { + ((Session) key.attachment()).write(key); + } + } + } + keys.clear(); + } else { + // TODO impose limit on # things to run at once (perhaps even + // specify costs) + while (!tasks.isEmpty() + && tasks.peek().getDelay(TimeUnit.MILLISECONDS) == 0L) { + ReactorTask task = tasks.remove(); + task.run(); + } + } + } + selector.close(); + } - public ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit units) { - ReactorTask task = new ReactorTask(r, System.currentTimeMillis() - + units.toMillis(delay), this); - tasks.add(task); - return task; - } + /** + * Schedule a task to occur after a delay. + * + * @param r + * The Runnable to be executed. + * @param delay + * The delay to sleep before running the task. + * @param units + * The time units that the delay is in. + * @return A ScheduledFuture representing the newly created task. + */ + public ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit units) { + ReactorTask task = new ReactorTask(r, System.currentTimeMillis() + + units.toMillis(delay), this); + tasks.add(task); + return task; + } - public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable r, - final long initialDelay, final long delay, final TimeUnit units) { - return schedule(new Runnable() { - public void run() { - r.run(); - Reactor.this.schedule(this, delay, units); - } - }, initialDelay, units); - } + /** + * Schedule a task with a fixed delay. + * + * @param r + * The Runnable to be repeatedly executed. + * @param initialDelay + * The initial delay before the task is first run. + * @param delay + * The regular delay to sleep before rerunning the task. + * @param units + * The time units that the initialDelay and delay tasks are + * expressed in. + * @return A ScheduledFuture representing the newly created task. + */ + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable r, + final long initialDelay, final long delay, final TimeUnit units) { + return schedule(new Runnable() { + public void run() { + r.run(); + Reactor.this.schedule(this, delay, units); + } + }, initialDelay, units); + } - public boolean cancel(ReactorTask task) { - return tasks.remove(task); - } + /** + * Cancel a task. This is used as a callback from ReactorTask. Removes a + * task from the tasks queue. + * + * TODO: remove the return value and assert that the return value of the + * remove call is the task itself? + * + * @param task + * The task to be canceled. + * @return The result of the queue removal (either the task itself, or null + * if there was nothing to remove - which should never be the case). + */ + boolean cancel(ReactorTask task) { + return tasks.remove(task); + } - public void shutdown() { - doShutdown = true; - } + /** + * Stop a running reactor. Note that this is <em>not</em> thread-safe; the + * caller is expected to be executing in the same thread as the reactor + * (i.e. one of the reactor's handlers or tasks). + */ + public void shutdown() { + doShutdown = true; + } } Modified: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-02-24 22:40:15 UTC (rev 499) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-02-25 06:53:56 UTC (rev 500) @@ -3,9 +3,25 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; - +/** + * Handler for events pertaining to a socket (session). Currently there is only + * a single event, which is the reception of a packet. + * + * @author yang + * + */ public interface ReactorHandler { - public void handle(Session session, InetSocketAddress src, ByteBuffer buf); + /** + * Handle a received packet. + * + * @param session + * The session (socket) at which the packet was received. + * @param src + * The sender's socket address. + * @param buf + * The received packet. + */ + public void handle(Session session, InetSocketAddress src, ByteBuffer buf); } Modified: java-reactor/trunk/src/reactor/ReactorTask.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTask.java 2008-02-24 22:40:15 UTC (rev 499) +++ java-reactor/trunk/src/reactor/ReactorTask.java 2008-02-25 06:53:56 UTC (rev 500) @@ -8,88 +8,94 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException; +/** + * A task that was scheduled for execution in a reactor. + * + * @author yang + * + */ public class ReactorTask implements ScheduledFuture<Void> { - private final Runnable r; - private final long time; - private final Reactor reactor; + private final Runnable r; + private final long time; + private final Reactor reactor; - private static enum TaskState { - WAITING, RUNNING, DONE, CANCELLED - }; + private static enum TaskState { + WAITING, RUNNING, DONE, CANCELLED + }; - private TaskState state = TaskState.WAITING; + private TaskState state = TaskState.WAITING; - public ReactorTask(Runnable r, long time, Reactor reactor) { - this.r = r; - this.time = time; - this.reactor = reactor; - } + ReactorTask(Runnable r, long time, Reactor reactor) { + this.r = r; + this.time = time; + this.reactor = reactor; + } - public void run() { - if (state != TaskState.CANCELLED) { - assert state == TaskState.WAITING; - state = TaskState.RUNNING; - try { - r.run(); - } catch (Exception ex) { - } - state = TaskState.DONE; - } - } + void run() { + if (state != TaskState.CANCELLED) { + assert state == TaskState.WAITING; + state = TaskState.RUNNING; + try { + r.run(); + } catch (Exception ex) { + } + state = TaskState.DONE; + } + } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (state == TaskState.WAITING) { - state = TaskState.CANCELLED; - boolean b = reactor.cancel(this); - assert b; - return true; - } else { - return false; - } - } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (state == TaskState.WAITING) { + state = TaskState.CANCELLED; + boolean b = reactor.cancel(this); + assert b; + return true; + } else { + return false; + } + } - @Override - public Void get() throws InterruptedException, ExecutionException { - // TODO is this correct? - return null; - } + @Override + public Void get() throws InterruptedException, ExecutionException { + // TODO is this correct? + return null; + } - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, - ExecutionException, TimeoutException { - throw new NotImplementedException(); - } + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + throw new NotImplementedException(); + } - @Override - public boolean isCancelled() { - return state == TaskState.CANCELLED; - } + @Override + public boolean isCancelled() { + return state == TaskState.CANCELLED; + } - @Override - public boolean isDone() { - return state == TaskState.DONE; - } + @Override + public boolean isDone() { + return state == TaskState.DONE; + } - @Override - public long getDelay(TimeUnit units) { - long delay = time - System.currentTimeMillis(); - return delay > 0 ? TimeUnit.MILLISECONDS.convert(delay, units) : 0; - } + @Override + public long getDelay(TimeUnit units) { + long delay = time - System.currentTimeMillis(); + return delay > 0 ? TimeUnit.MILLISECONDS.convert(delay, units) : 0; + } - @Override - public int compareTo(Delayed o) { - if (false && o instanceof ReactorTask) { - ReactorTask other = (ReactorTask) o; - System.out.println(time + " vs " + other.time + " " - + getDelay(TimeUnit.MILLISECONDS) + " vs " - + other.getDelay(TimeUnit.MILLISECONDS)); - return Long.valueOf(time).compareTo(other.time); - } else { - return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo( - o.getDelay(TimeUnit.MILLISECONDS)); - } - } + @Override + public int compareTo(Delayed o) { + if (false && o instanceof ReactorTask) { + ReactorTask other = (ReactorTask) o; + System.out.println(time + " vs " + other.time + " " + + getDelay(TimeUnit.MILLISECONDS) + " vs " + + other.getDelay(TimeUnit.MILLISECONDS)); + return Long.valueOf(time).compareTo(other.time); + } else { + return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo( + o.getDelay(TimeUnit.MILLISECONDS)); + } + } } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-02-24 22:40:15 UTC (rev 499) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-02-25 06:53:56 UTC (rev 500) @@ -9,7 +9,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; - +/** + * A simple test for the reactor.s + * @author yang + * + */ public class ReactorTest { ExecutorService e; Modified: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-02-24 22:40:15 UTC (rev 499) +++ java-reactor/trunk/src/reactor/Session.java 2008-02-25 06:53:56 UTC (rev 500) @@ -10,112 +10,160 @@ import java.util.ArrayList; import java.util.List; - +/** + * Represents a "session," which is probably a bad name for what is basically a + * wrapper around a datagram socket. + * + * @author yang + * + */ public class Session { - public final DatagramChannel channel; - public final ReactorHandler handler; - public final InetSocketAddress remoteSa, localSa; - public final int index; - // public final ByteBuffer readBuf = ByteBuffer.allocate(4096); - public final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); - public final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); + private final DatagramChannel channel; + private final ReactorHandler handler; + private final InetSocketAddress remoteSa; + private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); + private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); - public Session(InetSocketAddress remoteSa, InetSocketAddress localSa, - ReactorHandler handler, int index, Selector selector) { - this.handler = handler; - this.remoteSa = remoteSa; - this.localSa = localSa; - this.index = index; + /** + * Construct a new Session object. + * + * @param remoteSa + * Ignored. + * @param localSa + * The local socket address on which to send/receive packets. + * @param handler + * The handler for events on this socket. + * @param selector + * The selector that is used in the current reactor. + */ + Session(InetSocketAddress remoteSa, InetSocketAddress localSa, + ReactorHandler handler, Selector selector) { + this.handler = handler; + this.remoteSa = remoteSa; - try { - channel = DatagramChannel.open(); - channel.configureBlocking(false); - DatagramSocket socket = channel.socket(); - socket.setReuseAddress(true); - if (localSa != null) - socket.bind(localSa); - if (remoteSa != null) - channel.connect(remoteSa); + try { + channel = DatagramChannel.open(); + channel.configureBlocking(false); + DatagramSocket socket = channel.socket(); + socket.setReuseAddress(true); + if (localSa != null) + socket.bind(localSa); + if (remoteSa != null) + channel.connect(remoteSa); - channel.register(selector, SelectionKey.OP_READ, this); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } + channel.register(selector, SelectionKey.OP_READ, this); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } - // read messages is the priority - public void read(SelectionKey key) throws Exception { - while (true) { - try { - InetSocketAddress srcSa; + /** + * This is called by the reactor when there is a message to be received. + * Reading messages is the priority, so this is done before anything else. + * + * @param key + * The key into the selector's socket set representing our + * socket. + * @throws Exception + */ + void read(SelectionKey key) throws Exception { + while (true) { + try { + InetSocketAddress srcSa; - if (remoteSa == null) { - srcSa = (InetSocketAddress) channel.receive(readBuf); - } else { - int numRead = channel.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down - // cleanly. - // Do - // the same from our end and cancel the - // channel. - key.channel().close(); - key.cancel(); - } - // TODO also handle numRead == 0 - srcSa = remoteSa; - } + if (remoteSa == null) { + srcSa = (InetSocketAddress) channel.receive(readBuf); + } else { + int numRead = channel.read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down + // cleanly. + // Do + // the same from our end and cancel the + // channel. + key.channel().close(); + key.cancel(); + } + // TODO also handle numRead == 0 + srcSa = remoteSa; + } - if (srcSa == null) { - break; - } + if (srcSa == null) { + break; + } - // after channel wrote to buf, set lim = pos, then pos = 0 - readBuf.flip(); - // callback - handler.handle(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); - } catch (IOException e) { - // The remote forcibly closed the connection, cancel - // the selection key and close the channel. - key.cancel(); - channel.close(); - } - } - } + // after channel wrote to buf, set lim = pos, then pos = 0 + readBuf.flip(); + // callback + handler.handle(this, srcSa, readBuf); + // recycle buffer + readBuf.clear(); + } catch (IOException e) { + // The remote forcibly closed the connection, cancel + // the selection key and close the channel. + key.cancel(); + channel.close(); + } + } + } - public void write(SelectionKey key) throws Exception { - // Write until there's not more data ... - while (!pendingWrites.isEmpty()) { - ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - channel.write(buf); - if (buf.remaining() > 0) { - // ... or the socket's buffer fills up - break; - } - pendingWrites.remove(0); - } + /** + * Handle the event where the socket is ready to be written. This is + * currently not used since the send() method blindly writes immediately to + * the socket - which is bad. + * + * @param key + * The key into the selector's socket set that represents our + * socket. + * @throws Exception + */ + void write(SelectionKey key) throws Exception { + // Write until there's not more data ... + while (!pendingWrites.isEmpty()) { + ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); + channel.write(buf); + if (buf.remaining() > 0) { + // ... or the socket's buffer fills up + break; + } + pendingWrites.remove(0); + } - if (pendingWrites.isEmpty()) { - // We wrote away all data, so we're no longer - // interested - // in writing on this socket. Switch back to waiting - // for - // data. - key.interestOps(SelectionKey.OP_READ); - } - } + if (pendingWrites.isEmpty()) { + // We wrote away all data, so we're no longer + // interested + // in writing on this socket. Switch back to waiting + // for + // data. + key.interestOps(SelectionKey.OP_READ); + } + } - public void send(ByteBuffer writeBuf, InetSocketAddress dst) - throws Exception { - int bytes = channel.send(writeBuf, dst); - assert bytes == writeBuf.limit(); - } + /** + * Send messages from this socket. This does not enqueue anything onto the + * internal pendingWrites buffer if the socket's buffer is full, but instead + * triggers an assertion failure. + * + * @param writeBuf + * The packet to be sent. + * @param dst + * The destination socket address. + * @throws Exception + */ + public void send(ByteBuffer writeBuf, InetSocketAddress dst) + throws Exception { + int bytes = channel.send(writeBuf, dst); + assert bytes == writeBuf.limit(); + } - public void close() throws Exception { - channel.close(); - } + /** + * Close the underlying socket. + * + * @throws Exception + */ + public void close() throws Exception { + channel.close(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |