[virtualcommons-svn] SF.net SVN: virtualcommons:[302] csidex/trunk/src/main/java/edu/asu/commons
Status: Beta
Brought to you by:
alllee
From: <al...@us...> - 2009-10-19 19:56:40
|
Revision: 302 http://virtualcommons.svn.sourceforge.net/virtualcommons/?rev=302&view=rev Author: alllee Date: 2009-10-19 19:56:28 +0000 (Mon, 19 Oct 2009) Log Message: ----------- fixing some bugs related to threading of the event channel and connection management. Here's what the flow of the bug looks like: 1. client breaks connection 2. server doesn't realize that the client has broken the connection until it tries to write to that socket 3. transmitting an event to all clients used to occur in a single thread of execution. 4. while transmitting the event (say a ShowInstructionsRequest), a DisconnectionRequest bubbles up out of the csidex framework. 5. Since the event is asking to be handled in the same thread of execution, it still has acquired the same locks necessary to modify the clients map. So the disconnection request modifies the clients map while we are iterating through the clients map. Hence, a ConcurrentModificationException occurs. 6. Server dies, everyone goes home. The fix for this is to make sure that all DisconnectionRequests get handled in a separate thread execution so that thread will block while trying to acquire a lock on the clients map. Perhaps a better fix for this is just to by default always run each event handler in a separate thread of execution but for performance sake we have been avoiding that. So now there is an additional boolean parameter in EventHandler.handle() that allows you to specify whether or not you want to run each handler in a separate thread of execution. Modified Paths: -------------- csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java csidex/trunk/src/main/java/edu/asu/commons/event/EventConstraintChannel.java csidex/trunk/src/main/java/edu/asu/commons/event/EventHandler.java csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeProcessor.java csidex/trunk/src/main/java/edu/asu/commons/net/AbstractServerDispatcher.java csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java Modified: csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/event/AbstractEvent.java 2009-10-19 19:56:28 UTC (rev 302) @@ -16,9 +16,9 @@ private final static long serialVersionUID = -3443360054002127621L; - protected final Identifier id; + protected Identifier id; protected long creationTime; - private final String message; + protected String message; public AbstractEvent() { this(Identifier.NULL); Modified: csidex/trunk/src/main/java/edu/asu/commons/event/EventConstraintChannel.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/EventConstraintChannel.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/event/EventConstraintChannel.java 2009-10-19 19:56:28 UTC (rev 302) @@ -39,8 +39,8 @@ /** * Adds an event to the EventChannel which then propagates the event to - * all interested subscribers. EventHandler.handle(Event) is invoked in a separate - * thread of execution. + * all interested subscribers. EventHandler.handle(Event) is invoked in a + * single separate thread of execution. * * @param event The event to distribute via this EventChannel. */ @@ -58,6 +58,25 @@ } } + public void handle(final Event event, boolean newThread) { + if (newThread) { + synchronized (eventListeners) { + for (final Map.Entry<EventHandler<Event>, EventConstraint> entry : eventListeners.entrySet()) { + if (entry.getValue().accepts(event)) { + new Thread() { + public void run() { + entry.getKey().handle(event); + } + }.start(); + } + } + } + } + else { + handle(event); + } + } + public void dispatch(final Event event) { handle(event); } Modified: csidex/trunk/src/main/java/edu/asu/commons/event/EventHandler.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/EventHandler.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/event/EventHandler.java 2009-10-19 19:56:28 UTC (rev 302) @@ -17,4 +17,11 @@ */ public void handle(E event); + /** + * Handles the given Event in a new thread of execution. + * @param event + * @param newThread + */ + public void handle(E event, boolean newThread); + } Modified: csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java 2009-10-19 19:56:28 UTC (rev 302) @@ -19,6 +19,9 @@ new HashMap<Object, List<EventProcessor>>(); private final EventDispatcher defaultDispatcher; + + private final ThreadedDispatcher threadedDispatcher = new ThreadedDispatcher(); + private final SequentialDispatcher sequentialDispatcher = new SequentialDispatcher(); public final static EventTypeChannel INSTANCE = new EventTypeChannel(); @@ -32,10 +35,10 @@ public EventTypeChannel(boolean shouldThread) { if (shouldThread) { - defaultDispatcher = new ThreadedDispatcher(); + defaultDispatcher = threadedDispatcher; } else { - defaultDispatcher = new SequentialDispatcher(); + defaultDispatcher = sequentialDispatcher; } } @@ -120,9 +123,19 @@ if (event == null) return; defaultDispatcher.dispatch(event); } + + public void handle(Event event, boolean newThread) { + if (event == null) return; + if (newThread) { + threadedDispatcher.dispatch(event); + } + else { + sequentialDispatcher.dispatch(event); + } + } public void handleInThread(Event event) { - new ThreadedDispatcher().dispatch(event); + threadedDispatcher.dispatch(event); } public void dispatch(Event event) { @@ -156,13 +169,35 @@ } } - private class ThreadedDispatcher extends SequentialDispatcher { + private class ThreadedDispatcher implements EventDispatcher { public void dispatch(final Event event) { - new Thread() { - public void run() { - ThreadedDispatcher.super.dispatch(event); + final Class<? extends Event> eventClass = event.getClass(); + // first check handlers that want this and only this event type. + synchronized (equalTypesEventProcessorMap) { + List<EventProcessor> handlers = equalTypesEventProcessorMap.get(eventClass); + if (handlers != null) { + for (final EventProcessor<Event> handler: handlers) { + new Thread() { + public void run() { + handler.handle(event); + } + }.start(); + } } - }.start(); + } + // next, check to see if this event should be processed by the subtype processors. + synchronized (acceptsSubtypesEventProcessors) { + for (final EventProcessor<Event> handler: acceptsSubtypesEventProcessors) { + if (handler.getEventClass().isInstance(event)) { + new Thread() { + public void run() { + handler.handle(event); + } + }.start(); + } + } + } + } } } Modified: csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeProcessor.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeProcessor.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeProcessor.java 2009-10-19 19:56:28 UTC (rev 302) @@ -57,6 +57,19 @@ }); } + public void handle(final E event, boolean newThread) { + if (newThread) { + new Thread() { + public void run() { + handle(event); + } + }.start(); + } + else { + handle(event); + } + } + public void handleInExperimentThread(E event) { throw new UnsupportedOperationException("Override handleInExperimentThread and make sure you do NOT override handle(E) if you want to use single-threaded event handling."); } Modified: csidex/trunk/src/main/java/edu/asu/commons/net/AbstractServerDispatcher.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/net/AbstractServerDispatcher.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/net/AbstractServerDispatcher.java 2009-10-19 19:56:28 UTC (rev 302) @@ -104,18 +104,26 @@ logger.info(getClass() + " listening on port:" + port); try { bind(port); - listening = true; - while ( listening ) { - processIncomingConnections(); - // disconnect any pending disconnected clients and so on. - performConnectionMaintenance(); - } } catch (IOException e) { e.printStackTrace(); - logger.severe("unrecoverable IOException: " + e); + logger.severe(String.format("Couldn't bind to port %d due to exception [%s] - shutting down.", port, e)); shutdown(); + return; } + listening = true; + while ( listening ) { + try { + processIncomingConnections(); + // disconnect any pending disconnected clients + performConnectionMaintenance(); + } + catch (IOException e) { + e.printStackTrace(); + logger.severe("IO Exception while processing incoming connections: " + e); + } + + } } }; } Modified: csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java =================================================================== --- csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java 2009-10-19 18:58:17 UTC (rev 301) +++ csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java 2009-10-19 19:56:28 UTC (rev 302) @@ -75,7 +75,6 @@ public void write(Event event) { try { - // FIXME: consider caching the object output stream? synchronized (socket) { // ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); // out.writeObject(event); @@ -126,7 +125,7 @@ } private void requestDisconnection(Exception exception) { - dispatcher.getLocalEventHandler().handle(new DisconnectionRequest(id, exception)); + dispatcher.getLocalEventHandler().handle(new DisconnectionRequest(id, exception), true); } public void run() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |