[virtualcommons-svn] SF.net SVN: virtualcommons:[293] csidex/trunk/src/main/java/edu/asu/commons
Status: Beta
Brought to you by:
alllee
|
From: <al...@us...> - 2009-10-10 01:18:49
|
Revision: 293
http://virtualcommons.svn.sourceforge.net/virtualcommons/?rev=293&view=rev
Author: alllee
Date: 2009-10-10 01:18:43 +0000 (Sat, 10 Oct 2009)
Log Message:
-----------
minor hygiene, adding getDefaultRoundDuration() method that can be overridden by
subtypes of ExperimentRoundParameters, and working on improving connection
management.
Modified Paths:
--------------
csidex/trunk/src/main/java/edu/asu/commons/conf/ExperimentRoundParameters.java
csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java
csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java
csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java
csidex/trunk/src/main/java/edu/asu/commons/experiment/Persister.java
csidex/trunk/src/main/java/edu/asu/commons/net/NioDispatcher.java
csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java
csidex/trunk/src/main/java/edu/asu/commons/net/event/DisconnectionRequest.java
Modified: csidex/trunk/src/main/java/edu/asu/commons/conf/ExperimentRoundParameters.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/conf/ExperimentRoundParameters.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/conf/ExperimentRoundParameters.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -15,7 +15,7 @@
* @author <a href='al...@cs...'>Allen Lee</a>
* @version $Revision$
*/
-
+@SuppressWarnings("unchecked")
public interface ExperimentRoundParameters<T extends ExperimentConfiguration> extends Serializable {
public String getInstructions();
@@ -73,8 +73,16 @@
}
public Duration getRoundDuration() {
- return Duration.create(assistant.getIntProperty("duration", 240));
+ return Duration.create(assistant.getIntProperty("duration", getDefaultRoundDuration()));
}
+
+ /**
+ * Override to set up a different default round duration.
+ * @return
+ */
+ protected int getDefaultRoundDuration() {
+ return 240;
+ }
public void setParentConfiguration(E parentConfiguration) {
this.parentConfiguration = parentConfiguration;
Modified: csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/event/AbstractPersistableEvent.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -15,6 +15,8 @@
public abstract class AbstractPersistableEvent extends AbstractEvent
implements PersistableEvent, Comparable<AbstractPersistableEvent> {
+ private static final long serialVersionUID = -8335415577272927846L;
+
private static long classCounter = 0;
private final long ordinal;
Modified: csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/event/EventTypeChannel.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -72,6 +72,10 @@
public <E extends Event> void subscribe(EventProcessor<E> handler) {
add(handler);
}
+
+ public <E extends Event> boolean unsubscribe(EventProcessor<E> handler) {
+ return remove(handler);
+ }
public <E extends Event> boolean remove(EventProcessor<E> handler) {
synchronized (acceptsSubtypesEventProcessors) {
@@ -99,10 +103,6 @@
}
}
- public <E extends Event> boolean unsubscribe(EventProcessor<E> handler) {
- return remove(handler);
- }
-
int getNumberOfOwners() {
return owners.size();
}
@@ -136,6 +136,7 @@
private class SequentialDispatcher implements EventDispatcher {
public void dispatch(Event event) {
final Class<? extends Event> eventClass = event.getClass();
+ // first check handlers that want this and only this event type.
synchronized (equalTypesEventProcessorMap) {
List<EventProcessor> handlers = equalTypesEventProcessorMap.get(eventClass);
if (handlers != null) {
@@ -144,6 +145,7 @@
}
}
}
+ // next, check to see if this event should be processed by the subtype processors.
synchronized (acceptsSubtypesEventProcessors) {
for (final EventProcessor<Event> handler: acceptsSubtypesEventProcessors) {
if (handler.getEventClass().isInstance(event)) {
@@ -154,7 +156,6 @@
}
}
- // FIXME: turn into decorator pattern by having an EventDispatcher delegate? Only if there's less code?
private class ThreadedDispatcher extends SequentialDispatcher {
public void dispatch(final Event event) {
new Thread() {
Modified: csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/experiment/AbstractExperiment.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -143,6 +143,12 @@
return channel;
}
+ /**
+ * Subtypes should always use this to add an event processor in lieu of talking with the event channel
+ * directly, as it ensures that the command queue can be used.
+ *
+ * @param processor
+ */
protected void addEventProcessor(EventTypeProcessor<? extends Event> processor) {
processor.setExperiment(this);
channel.add(this, processor);
@@ -183,7 +189,7 @@
}
catch (Exception exception) {
getLogger().severe("Attempting to recover from exception: " + exception);
- getLogger().throwing(getClass().getName(), "experimentThread.run()", exception);
+ getLogger().throwing(getClass().getName(), "experimentServerThread.run()", exception);
exception.printStackTrace();
continue;
}
Modified: csidex/trunk/src/main/java/edu/asu/commons/experiment/Persister.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/experiment/Persister.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/experiment/Persister.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -58,7 +58,7 @@
private R roundConfiguration;
private final SortedSet<PersistableEvent> actions;
- private final SortedSet<ChatRequest> chats;
+ private final SortedSet<ChatRequest> chatRequests;
private String experimentSaveDirectory;
private String persistenceDirectory;
private EventChannel channel;
@@ -74,7 +74,7 @@
public Persister(T experimentConfiguration) {
this.persistenceDirectory = experimentConfiguration.getPersistenceDirectory();
this.actions = new TreeSet<PersistableEvent>();
- this.chats = new TreeSet<ChatRequest>();
+ this.chatRequests = new TreeSet<ChatRequest>();
// initialize persister with first round parameters
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ROUND_SAVE_DIRECTORY_FORMAT);
this.experimentSaveDirectory = simpleDateFormat.format(new Date());
@@ -145,11 +145,18 @@
}
}
+ public void clearChatData() {
+ chatRequests.clear();
+ }
+
public void store(ChatRequest request) {
request.timestamp();
- // FIXME: this is inefficient, but chat data shouldn't be too huge.. should it?
- synchronized (chats) {
- chats.add(request);
+ // FIXME: right now all cumulative chat requests are stored in each round file. Should either
+ // switch to one set of chats per round, or a single chat file, as in the chatLogger. There
+ // is some difficulty in figuring out when exactly to clear out all the old chat requests in a
+ // flexible manner. Could probably do it when the BeginCommunicationRequest is handled, actually.
+ synchronized (chatRequests) {
+ chatRequests.add(request);
}
chatLogger.log(Level.ALL,
String.format("%s, %s, %s, %s",
@@ -200,7 +207,6 @@
*/
private static void processSaveDirectory(File directory, List<SaveFileProcessor> processors) {
try {
-// ExperimentConfiguration experimentConfiguration = restoreExperimentConfiguration(directory, DEFAULT_EXPERIMENT_CONFIGURATION_FILE);
int numberOfRounds = restoreExperimentConfiguration(directory).getAllParameters().size();
for (int roundNumber = 0; roundNumber < numberOfRounds; roundNumber++) {
SavedRoundData savedRoundData = restoreSavedRoundData(directory, roundNumber);
@@ -256,7 +262,6 @@
}
}
}
-
public final <E extends DataModel<R>> void persist(E serverDataModel) {
save(serverDataModel);
@@ -371,8 +376,8 @@
synchronized (actions) {
oos.writeObject(actions);
}
- synchronized (chats) {
- oos.writeObject(chats);
+ synchronized (chatRequests) {
+ oos.writeObject(chatRequests);
}
oos.flush();
}
Modified: csidex/trunk/src/main/java/edu/asu/commons/net/NioDispatcher.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/net/NioDispatcher.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/net/NioDispatcher.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -21,7 +21,6 @@
import edu.asu.commons.event.EventChannel;
import edu.asu.commons.net.event.ConnectionEvent;
import edu.asu.commons.net.event.DisconnectionEvent;
-import edu.asu.commons.util.Duration;
/**
@@ -52,7 +51,7 @@
// either a single NioDispatcherWorker or an aggregate WorkerPool that
// constructs NioDispatcherWorkers.
private Worker<SocketChannel> worker;
-
+
// we need to maintain a mapping between Identifiers -> SocketChannel as
// well as SocketChannel -> Identifier due to the way nio inherently
// works; when data is incoming across the network the Selector is woken
@@ -129,7 +128,6 @@
private Identifier readConnectionEvent(SocketChannel connection) throws IOException {
InputStream in = connection.socket().getInputStream();
// read past the int header
-
in.read(); in.read(); in.read(); in.read();
ObjectInputStream ois = new ObjectInputStream(in);
try {
@@ -137,9 +135,8 @@
return event.getId();
}
catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException("Could not read connection event", e);
}
- return null;
}
private void addMapping(Identifier id, SocketChannel channel) {
@@ -448,6 +445,12 @@
public Identifier process(SocketChannel incoming) {
synchronized (channels) {
+ try {
+ selector.selectNow();
+ }
+ catch (IOException exception) {
+ exception.printStackTrace();
+ }
channels.add(incoming);
}
synchronized (incoming) {
@@ -526,20 +529,22 @@
{
SocketChannel incoming = iter.next();
iter.remove();
- try {
- incoming.configureBlocking(false);
- incoming.register(selector, SelectionKey.OP_READ);
- }
- catch (IOException e) {
- // recoverable, incoming connection was broken,
- // just ignore it and move on.
- e.printStackTrace();
- continue;
- }
- finally {
- synchronized (incoming) {
- incoming.notifyAll();
+ if (incoming.isOpen()) {
+ try {
+ incoming.configureBlocking(false);
+ incoming.register(selector, SelectionKey.OP_READ);
}
+ catch (IOException e) {
+ // recoverable, incoming connection was broken,
+ // just ignore it and move on.
+ e.printStackTrace();
+ continue;
+ }
+ finally {
+ synchronized (incoming) {
+ incoming.notifyAll();
+ }
+ }
}
}
}
Modified: csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/net/SocketDispatcherWorker.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -142,14 +142,15 @@
}
catch (ClassNotFoundException e) {
e.printStackTrace();
+ requestDisconnection(e);
running = false;
}
}
-
}
catch (Exception e) {
+ // runtime unhandled exception.
e.printStackTrace();
- dispatcher.getLocalEventHandler().handle(new DisconnectionRequest(id, e));
+ requestDisconnection(e);
}
}
}
Modified: csidex/trunk/src/main/java/edu/asu/commons/net/event/DisconnectionRequest.java
===================================================================
--- csidex/trunk/src/main/java/edu/asu/commons/net/event/DisconnectionRequest.java 2009-10-10 01:16:33 UTC (rev 292)
+++ csidex/trunk/src/main/java/edu/asu/commons/net/event/DisconnectionRequest.java 2009-10-10 01:18:43 UTC (rev 293)
@@ -31,4 +31,8 @@
public Exception getException() {
return exception;
}
+
+ public String toString() {
+ return "Disconnecting id " + id + " due to exception: " + exception;
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|