Thread: [Assorted-commits] SF.net SVN: assorted: [468] java-reactor
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-18 20:04:43
|
Revision: 468 http://assorted.svn.sourceforge.net/assorted/?rev=468&view=rev Author: yangzhang Date: 2008-02-18 12:04:44 -0800 (Mon, 18 Feb 2008) Log Message: ----------- java-reactor: done, son! Added Paths: ----------- java-reactor/ java-reactor/trunk/ java-reactor/trunk/src/ java-reactor/trunk/src/build java-reactor/trunk/src/reactor/ java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/ReactorTask.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/Session.java Added: java-reactor/trunk/src/build =================================================================== --- java-reactor/trunk/src/build (rev 0) +++ java-reactor/trunk/src/build 2008-02-18 20:04:44 UTC (rev 468) @@ -0,0 +1,2 @@ +java-reactor: + srcs: [reactor/Reactor.java] Added: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java (rev 0) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-02-18 20:04:44 UTC (rev 468) @@ -0,0 +1,96 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + + +public class Reactor { + + private final Selector selector; + private final List<Session> sessions = new ArrayList<Session>(); + private boolean doShutdown = false; + private final PriorityQueue<ReactorTask> tasks = new PriorityQueue<ReactorTask>(); + + public Reactor() throws Exception { + selector = Selector.open(); + } + + public Session register(InetSocketAddress remoteSa, + InetSocketAddress localSa, ReactorHandler handler) { + Session session = new Session(remoteSa, localSa, handler, sessions + .size(), selector); + sessions.add(session); + return session; + } + + public void react() throws Exception { + while (true) { + if (doShutdown) + break; + + int updated; + if (tasks.isEmpty()) { + updated = selector.select(); + } else { + long t = tasks.peek().getDelay(TimeUnit.MILLISECONDS); + updated = t > 0 ? selector.select(t) : 0; + } + + if (updated > 0) { + Set<SelectionKey> keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + if (key.isValid()) { + if (key.isReadable()) { + ((Session) key.attachment()).read(key); + } else if (key.isWritable()) { + ((Session) key.attachment()).write(key); + } + } + } + keys.clear(); + } else { + // TODO impose limit on # things to run at once (perhaps even + // specify costs) + while (!tasks.isEmpty() + && tasks.peek().getDelay(TimeUnit.MILLISECONDS) == 0L) { + ReactorTask task = tasks.remove(); + task.run(); + } + } + } + selector.close(); + } + + public ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit units) { + ReactorTask task = new ReactorTask(r, System.currentTimeMillis() + + units.toMillis(delay), this); + tasks.add(task); + return task; + } + + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable r, + final long initialDelay, final long delay, final TimeUnit units) { + return schedule(new Runnable() { + public void run() { + r.run(); + Reactor.this.schedule(this, delay, units); + } + }, initialDelay, units); + } + + public boolean cancel(ReactorTask task) { + return tasks.remove(task); + } + + public void shutdown() { + doShutdown = true; + } + +} Added: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-02-18 20:04:44 UTC (rev 468) @@ -0,0 +1,11 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + + +public interface ReactorHandler { + + public void handle(Session session, InetSocketAddress src, ByteBuffer buf); + +} Added: java-reactor/trunk/src/reactor/ReactorTask.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTask.java (rev 0) +++ java-reactor/trunk/src/reactor/ReactorTask.java 2008-02-18 20:04:44 UTC (rev 468) @@ -0,0 +1,95 @@ +package reactor; + +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +public class ReactorTask implements ScheduledFuture<Void> { + + private final Runnable r; + private final long time; + private final Reactor reactor; + + private static enum TaskState { + WAITING, RUNNING, DONE, CANCELLED + }; + + private TaskState state = TaskState.WAITING; + + public ReactorTask(Runnable r, long time, Reactor reactor) { + this.r = r; + this.time = time; + this.reactor = reactor; + } + + public void run() { + if (state != TaskState.CANCELLED) { + assert state == TaskState.WAITING; + state = TaskState.RUNNING; + try { + r.run(); + } catch (Exception ex) { + } + state = TaskState.DONE; + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (state == TaskState.WAITING) { + state = TaskState.CANCELLED; + boolean b = reactor.cancel(this); + assert b; + return true; + } else { + return false; + } + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + // TODO is this correct? + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + throw new NotImplementedException(); + } + + @Override + public boolean isCancelled() { + return state == TaskState.CANCELLED; + } + + @Override + public boolean isDone() { + return state == TaskState.DONE; + } + + @Override + public long getDelay(TimeUnit units) { + long delay = time - System.currentTimeMillis(); + return delay > 0 ? TimeUnit.MILLISECONDS.convert(delay, units) : 0; + } + + @Override + public int compareTo(Delayed o) { + if (false && o instanceof ReactorTask) { + ReactorTask other = (ReactorTask) o; + System.out.println(time + " vs " + other.time + " " + + getDelay(TimeUnit.MILLISECONDS) + " vs " + + other.getDelay(TimeUnit.MILLISECONDS)); + return Long.valueOf(time).compareTo(other.time); + } else { + return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo( + o.getDelay(TimeUnit.MILLISECONDS)); + } + } + +} Added: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java (rev 0) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-02-18 20:04:44 UTC (rev 468) @@ -0,0 +1,105 @@ +package reactor; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + + +public class ReactorTest { + ExecutorService e; + + public ReactorTest() { + e = Executors.newCachedThreadPool(); + } + + public void spawn(final Runnable r) { + e.submit(new Runnable() { + public void run() { + try { + r.run(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + private Runnable makeRunnable(final int i) { + return new Runnable() { + public void run() { + System.out.println(i); + } + }; + } + + public void test() throws Exception { + InetAddress localhost = InetAddress.getLocalHost(); + int serverPort = 11111, clientPort = 22222; + final InetSocketAddress serverSa, clientSa; + serverSa = new InetSocketAddress(localhost, serverPort); + clientSa = new InetSocketAddress(localhost, clientPort); + + final ReactorHandler handler = new ReactorHandler() { + public void handle(Session service, InetSocketAddress src, + ByteBuffer buf) { + System.out.println("received: " + buf); + } + }; + + spawn(new Runnable() { + public void run() { + try { + Reactor r = new Reactor(); + r.register(null, serverSa, handler); + + for (int i = 0; i < 10; i++) + r.schedule(makeRunnable(i), 200 * i, + TimeUnit.MILLISECONDS); + + Thread.sleep(1000); + r.react(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); + + spawn(new Runnable() { + public void run() { + try { + Reactor r = new Reactor(); + ByteBuffer writeBuf = ByteBuffer.allocate(5); + Session s = r.register(null, clientSa, handler); + Thread.sleep(2000); + s.send(writeBuf, clientSa); + r.react(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); + + spawn(new Runnable() { + public void run() { + try { + byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; + Thread.sleep(3000); + new DatagramSocket().send(new DatagramPacket(writeBuf, + writeBuf.length, serverSa)); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); + } + + public static void main(String args[]) throws Exception { + new ReactorTest().test(); + } + +} Added: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java (rev 0) +++ java-reactor/trunk/src/reactor/Session.java 2008-02-18 20:04:44 UTC (rev 468) @@ -0,0 +1,121 @@ +package reactor; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.List; + + +public class Session { + + public final DatagramChannel channel; + public final ReactorHandler handler; + public final InetSocketAddress remoteSa, localSa; + public final int index; + // public final ByteBuffer readBuf = ByteBuffer.allocate(4096); + public final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); + public final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); + + public Session(InetSocketAddress remoteSa, InetSocketAddress localSa, + ReactorHandler handler, int index, Selector selector) { + this.handler = handler; + this.remoteSa = remoteSa; + this.localSa = localSa; + this.index = index; + + try { + channel = DatagramChannel.open(); + channel.configureBlocking(false); + DatagramSocket socket = channel.socket(); + socket.setReuseAddress(true); + if (localSa != null) + socket.bind(localSa); + if (remoteSa != null) + channel.connect(remoteSa); + + channel.register(selector, SelectionKey.OP_READ, this); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + // read messages is the priority + public void read(SelectionKey key) throws Exception { + while (true) { + try { + InetSocketAddress srcSa; + + if (remoteSa == null) { + srcSa = (InetSocketAddress) channel.receive(readBuf); + } else { + int numRead = channel.read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down + // cleanly. + // Do + // the same from our end and cancel the + // channel. + key.channel().close(); + key.cancel(); + } + // TODO also handle numRead == 0 + srcSa = remoteSa; + } + + if (srcSa == null) { + break; + } + + // after channel wrote to buf, set lim = pos, then pos = 0 + readBuf.flip(); + // callback + handler.handle(this, srcSa, readBuf); + // recycle buffer + readBuf.clear(); + } catch (IOException e) { + // The remote forcibly closed the connection, cancel + // the selection key and close the channel. + key.cancel(); + channel.close(); + } + } + } + + public void write(SelectionKey key) throws Exception { + // Write until there's not more data ... + while (!pendingWrites.isEmpty()) { + ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); + channel.write(buf); + if (buf.remaining() > 0) { + // ... or the socket's buffer fills up + break; + } + pendingWrites.remove(0); + } + + if (pendingWrites.isEmpty()) { + // We wrote away all data, so we're no longer + // interested + // in writing on this socket. Switch back to waiting + // for + // data. + key.interestOps(SelectionKey.OP_READ); + } + } + + public void send(ByteBuffer writeBuf, InetSocketAddress dst) + throws Exception { + int bytes = channel.send(writeBuf, dst); + assert bytes == writeBuf.limit(); + } + + public void close() throws Exception { + channel.close(); + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-03-04 03:08:14
|
Revision: 596 http://assorted.svn.sourceforge.net/assorted/?rev=596&view=rev Author: yangzhang Date: 2008-03-03 19:08:18 -0800 (Mon, 03 Mar 2008) Log Message: ----------- tagged 0.1 release Added Paths: ----------- java-reactor/tags/ java-reactor/tags/0.1/ Copied: java-reactor/tags/0.1 (from rev 593, java-reactor/trunk) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |