[virtualcommons-svn] SF.net SVN: virtualcommons:[296] csidex/trunk/src/main/java/edu/asu/commons
Status: Beta
Brought to you by:
alllee
From: <al...@us...> - 2009-10-13 17:10:03
|
Revision: 296 http://virtualcommons.svn.sourceforge.net/virtualcommons/?rev=296&view=rev Author: alllee Date: 2009-10-13 17:09:51 +0000 (Tue, 13 Oct 2009) Log Message: ----------- adding cached SocketDispatcherWorker that uses cached Object(Input|Output)Streams instead of recreating them each time. Should probably create a subtype of both that uses either and make it configurable. Modified Paths: -------------- csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java csidex/trunk/src/main/java/edu/asu/commons/experiment/StateMachine.java csidex/trunk/src/main/java/edu/asu/commons/net/ClientSocketDispatcher.java csidex/trunk/src/main/java/edu/asu/commons/net/DispatcherFactory.java csidex/trunk/src/main/java/edu/asu/commons/net/ServerSocketDispatcher.java csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java csidex/trunk/src/main/java/edu/asu/commons/util/Duration.java Modified: csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java 2009-10-13 17:09:51 UTC (rev 296) @@ -16,7 +16,7 @@ private final static long serialVersionUID = -3443360054002127621L; - protected Identifier id; + protected final Identifier id; protected long creationTime; private final String message; @@ -34,9 +34,7 @@ "a null Identifier"); } this.id = id; - // FIXME: switch to System.currentTimeNanos() once we become Java 1.5 - // compliant. - this.creationTime = System.currentTimeMillis(); + this.creationTime = System.nanoTime(); this.message = message; } @@ -46,10 +44,15 @@ public Identifier getId() { return id; } + + public String getMessage() { + return message; + } public String toString() { if (message == null) { - return getClass() + " id [" + id + "], creation time [ " + creationTime + "]"; + return String.format("%s - id: %s, created on %d", getClass(), id, creationTime); +// return getClass() + " id [" + id + "], creation time [ " + creationTime + "]"; } return message; } Modified: csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java 2009-10-13 17:09:51 UTC (rev 296) @@ -7,7 +7,7 @@ * $Id$ * * Base class that provides uniqueness with respect to the time-ordered stream - * of Events originating from a single place. + * of Events originating from a single place. * * @author <a href='mailto:All...@as...'>Allen Lee</a> * @version $Revision$ @@ -51,6 +51,6 @@ } public final void timestamp() { - super.creationTime = System.currentTimeMillis(); + super.creationTime = System.nanoTime(); } } Modified: csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java 2009-10-13 17:09:51 UTC (rev 296) @@ -37,19 +37,6 @@ private final List<Command> commands = new LinkedList<Command>(); private Thread serverThread; -// private final Thread eventHandlingThread = new Thread() { -// public void run() { -// while ( isRunning() ) { -// synchronized (commands) { -// for (Iterator<Command> iterator = commands.iterator(); iterator.hasNext(); ) { -// iterator.next().execute(); -// iterator.remove(); -// } -// } -// AbstractExperiment.sleep(50); -// } -// } -// }; private boolean running; Modified: csidex/trunk/src/main/java/edu/asu/commons/experiment/StateMachine.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/experiment/StateMachine.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/experiment/StateMachine.java 2009-10-13 17:09:51 UTC (rev 296) @@ -5,7 +5,7 @@ /** * $Id$ * - * Very basic experiment state machine. + * Basic experiment state machine. * * @author <a href='mailto:All...@as...'>Allen Lee</a> * @version $Revision$ @@ -13,4 +13,5 @@ public interface StateMachine { public void initialize(); public void execute(Dispatcher dispatcher); +// public void execute(); } \ No newline at end of file Modified: csidex/trunk/src/main/java/edu/asu/commons/net/ClientSocketDispatcher.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/net/ClientSocketDispatcher.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/net/ClientSocketDispatcher.java 2009-10-13 17:09:51 UTC (rev 296) @@ -47,13 +47,14 @@ Socket socket = new Socket(); socket.connect(inetSocketAddress); // block while we wait for the ServerSocketDispatcher to assign an - // Identifier to us. - Event event = SocketDispatcherWorker.readEvent(socket); + // Identifier to us. The construction of an ObjectInputStream blocks. + worker = new SocketDispatcherWorker(this, socket); + ConnectionEvent event = (ConnectionEvent) worker.readEvent(); assert event instanceof ConnectionEvent; Identifier id = event.getId(); - worker = new SocketDispatcherWorker(this, id, socket); + worker.setId(id); worker.start(); - getLocalEventHandler().handle((ConnectionEvent) event); + getLocalEventHandler().handle(event); return id; } catch (IOException e) { Modified: csidex/trunk/src/main/java/edu/asu/commons/net/DispatcherFactory.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/net/DispatcherFactory.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/net/DispatcherFactory.java 2009-10-13 17:09:51 UTC (rev 296) @@ -23,8 +23,8 @@ } public ClientDispatcher createClientDispatcher(EventChannel channel) { -// return new ClientSocketDispatcher(channel); - return new NioDispatcher(channel, 1); + return new ClientSocketDispatcher(channel); +// return new NioDispatcher(channel, 1); } public ServerDispatcher createServerDispatcher(EventChannel channel) { @@ -32,7 +32,7 @@ } public ServerDispatcher createServerDispatcher(EventChannel channel, int workerPoolSize) { - return new NioDispatcher(channel, workerPoolSize); -// return new ServerSocketDispatcher(channel, workerPoolSize); +// return new NioDispatcher(channel, workerPoolSize); + return new ServerSocketDispatcher(channel, workerPoolSize); } } Modified: csidex/trunk/src/main/java/edu/asu/commons/net/ServerSocketDispatcher.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/net/ServerSocketDispatcher.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/net/ServerSocketDispatcher.java 2009-10-13 17:09:51 UTC (rev 296) @@ -91,8 +91,7 @@ return; } Identifier id = new SocketIdentifier(incoming); - // FIXME: consider pooling socket connections - SocketDispatcherWorker worker = new SocketDispatcherWorker(this, id, incoming); + SocketDispatcherWorker worker = new SocketDispatcherWorker(this, incoming, id); // immediately write a ConnectionEvent to the incoming connection. ConnectionEvent event = new ConnectionEvent(id); worker.write(event); Modified: csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java 2009-10-13 17:09:51 UTC (rev 296) @@ -27,13 +27,22 @@ private final Dispatcher dispatcher; private final Socket socket; - private final Identifier id; + private Identifier id; private Thread workerThread; private boolean running; + private ObjectOutputStream cachedOut; + private ObjectInputStream cachedIn; - protected SocketDispatcherWorker(Dispatcher dispatcher, Identifier id, Socket socket) throws IOException { + protected SocketDispatcherWorker(Dispatcher dispatcher, Socket socket) throws IOException { this.dispatcher = dispatcher; this.socket = socket; + cachedOut = new ObjectOutputStream(socket.getOutputStream()); + cachedOut.flush(); + cachedIn = new ObjectInputStream(socket.getInputStream()); + } + + protected SocketDispatcherWorker(Dispatcher dispatcher, Socket socket, Identifier id) throws IOException { + this(dispatcher, socket); this.id = id; } @@ -68,9 +77,12 @@ try { // FIXME: consider caching the object output stream? synchronized (socket) { - ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); - out.writeObject(event); - out.flush(); +// ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); +// out.writeObject(event); +// out.flush(); + cachedOut.reset(); + cachedOut.writeObject(event); + cachedOut.flush(); } // XXX: don't close() the stream. It closes the wrapped Socket // output stream and essentially fubars the socket. @@ -96,7 +108,9 @@ } public synchronized Event readEvent() throws IOException, ClassNotFoundException { - return SocketDispatcherWorker.readEvent(socket); +// return SocketDispatcherWorker.readEvent(socket); + + return (Event) cachedIn.readObject(); } public Socket getSocket() { @@ -117,10 +131,12 @@ public void run() { try { +// cachedIn = new ObjectInputStream(socket.getInputStream()); while ( isRunning() ) { // try to read Events from the socket try { - dispatcher.getLocalEventHandler().handle( readEvent(socket) ); + dispatcher.getLocalEventHandler().handle( readEvent() ); + // wake up all threads waiting on the dispatcher.. synchronized (dispatcher) { dispatcher.notifyAll(); } @@ -153,4 +169,12 @@ requestDisconnection(e); } } + + /** + * Should only be invoked by the client after receiving its Identifier from the server. + * @param id + */ + void setId(Identifier id) { + this.id = id; + } } Modified: csidex/trunk/src/main/java/edu/asu/commons/util/Duration.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/util/Duration.java 2009-10-13 17:09:24 UTC (rev 295) +++ csidex/trunk/src/main/java/edu/asu/commons/util/Duration.java 2009-10-13 17:09:51 UTC (rev 296) @@ -8,7 +8,7 @@ * * Inspired/derived from the timeandmoney.sf.net project's API. * - * @author <a href='al...@cs...'>Allen Lee</a> + * @author <a href='all...@as...'>Allen Lee</a> * @version $Revision$ */ public class Duration implements Serializable { @@ -17,6 +17,7 @@ // FIXME: what's a better name for the amount of time a duration is supposed // to take up? TimeAndMoney uses 'quantity' I believe. + // Delta is always specified in milliseconds. private final long delta; // number of times this Duration has been restart()-ed. private long startCount = 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |