|
From: <tre...@us...> - 2007-11-12 17:37:13
|
Revision: 572
http://ogoglio.svn.sourceforge.net/ogoglio/?rev=572&view=rev
Author: trevorolio
Date: 2007-11-12 09:37:16 -0800 (Mon, 12 Nov 2007)
Log Message:
-----------
Fixed a bug in which the sim comet connection was dropped due to heartbeat timing miscalculation, causing events generated via web api to not make their way back to connected clients if there were no client originated comet events. Hello, edge case.
Modified Paths:
--------------
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SimMessageHandler.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SpaceSimulator.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SimMessageHandler.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SimMessageHandler.java 2007-11-12 17:37:13 UTC (rev 571)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SimMessageHandler.java 2007-11-12 17:37:16 UTC (rev 572)
@@ -55,6 +55,7 @@
if (!(message.getPayload() instanceof PayloadFactory.HeartbeatPayload)) {
Log.error("Somebody sent message with the 'heartbeat' space id:"+message.getPayload());
}
+
//Log.info("Sim is ignoring heartbeat from proxy.");
return;
}
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SpaceSimulator.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SpaceSimulator.java 2007-11-12 17:37:13 UTC (rev 571)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SpaceSimulator.java 2007-11-12 17:37:16 UTC (rev 572)
@@ -357,7 +357,6 @@
}
page.setContentType(event.getStringProperty(SpaceEvent.CONTENT_TYPE));
listener.generatedSpaceEvent(event, SpaceSimulator.this);
-
} else if (SpaceEvent.UPDATE_PAGE_CONTENT_EVENT.equals(event.getName())) {
long thingID = event.getLongProperty(SpaceEvent.THING_ID).longValue();
long pageID = event.getLongProperty(SpaceEvent.PAGE_ID).longValue();
@@ -371,7 +370,6 @@
}
page.setContentType(event.getStringProperty(SpaceEvent.CONTENT_TYPE));
listener.generatedSpaceEvent(event, SpaceSimulator.this);
-
} else if (SpaceEvent.TEXT_SAY_EVENT.equals(event.getName())) {
String username = event.getStringProperty(SpaceEvent.USERNAME);
User user = space.getUser(username);
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java 2007-11-12 17:37:13 UTC (rev 571)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java 2007-11-12 17:37:16 UTC (rev 572)
@@ -63,20 +63,20 @@
private ClientMessageHandler clientMessageHandler = new ClientMessageHandler();
private Timer outgoingHeartbeatTimer = new Timer();
-
- private long lastIncomingHeartbeat = 0L;
+ private long lastSentSimHeartbeat = 0L;
+
private static final long HEARTBEAT_INTERVAL_TO_SIM = 5000;
-
+
//needed because we are CLIENT of the sim servlet and need to know how to reach it
private WebAPIDescriptor descriptor;
-
+
public MessageProxy(SessionFactory sessionFactory, WebAPIDescriptor descriptor) throws IOException {
ArgumentUtils.assertNotNull(sessionFactory);
this.sessionFactory = sessionFactory;
- this.descriptor=descriptor;
+ this.descriptor = descriptor;
channelServer = new NetworkChannelServer(clientMessageHandler, descriptor.getCometProxyURI(), true, this);
- Log.info("Started Message Proxy on port " + channelServer.getLocator().getPort()+" with target of "+descriptor.getCometSimURI());
+ Log.info("Started Message Proxy on port " + channelServer.getLocator().getPort() + " with target of " + descriptor.getCometSimURI());
outgoingHeartbeatTimer.schedule(new OutgoingHeartbeatTask(), 5000, 60000);
}
@@ -85,7 +85,7 @@
PayloadFactory.HeartbeatPayload payload = new PayloadFactory.HeartbeatPayload();
Object[] locators = locatorAuths.getKeys();
for (int i = 0; i < locators.length; i++) {
- Locator remoteLocator = (Locator)locators[i];
+ Locator remoteLocator = (Locator) locators[i];
Message message = new Message(getLocator(), remoteLocator, 1, payload);
try {
channelServer.sendMessage(message);
@@ -94,14 +94,14 @@
Log.error("Heartbeat failure to " + remoteLocator, e);
logout(remoteLocator);
} catch (NoSuchDestinationException e1) {
- Log.error("Could not logout locator (" +remoteLocator +") with failed heartbeat",e1);
+ Log.error("Could not logout locator (" + remoteLocator + ") with failed heartbeat", e1);
}
}
-
+
}
}
}
-
+
public void cleanup() {
channelServer.cleanup();
outgoingHeartbeatTimer.cancel();
@@ -120,7 +120,7 @@
return channelServer.getLocator().getPort();
}
*/
-
+
private class ClientMessageHandler implements MessageHandler {
public void handleMessage(Message request, TCPChannel sourceChannel) throws NoSuchDestinationException {
try {
@@ -153,46 +153,46 @@
}
SpaceRecord spaceRecord = SpacePersistTasks.findSpaceBySpaceID(payload.getSpaceID(), sessionFactory);
- if(spaceRecord == null) {
+ if (spaceRecord == null) {
Log.error("Got an auth message for an unknown space: " + payload.getSpaceID());
Message failureMessage = new Message(channelServer.getLocator(), request.getOrigin(), payload.getSpaceID(), new PayloadFactory.AuthenticationFailurePayload("Could not find that space."));
sourceChannel.sendMessage(failureMessage);
return;
}
-
+
SimRecord simRecord = SpacePersistTasks.findOrAssignSim(spaceRecord, sessionFactory);
- if(simRecord == null) { //Oh, crap. Couldn't assign a sim
+ if (simRecord == null) { //Oh, crap. Couldn't assign a sim
Log.error("Could not assign sim for space " + spaceRecord.getSpaceID());
Message failureMessage = new Message(channelServer.getLocator(), request.getOrigin(), payload.getSpaceID(), new PayloadFactory.AuthenticationFailurePayload("Could not find a simulator for that space."));
sourceChannel.sendMessage(failureMessage);
return;
}
-
- if(username.startsWith(WebConstants.GUEST_COOKIE_PREFIX)) {
+
+ if (username.startsWith(WebConstants.GUEST_COOKIE_PREFIX)) {
URI userListURI = WebAPIUtil.appendToURI(simRecord.getSimURI(), "space/" + spaceRecord.getSpaceID() + "/user/");
XMLElement element = new WebAPIClientWire().fetchAuthenticatedXML(userListURI, null);
- if(element == null) {
+ if (element == null) {
Log.error("Could not get space user count for guest: " + payload.getSpaceID());
Message failureMessage = new Message(channelServer.getLocator(), request.getOrigin(), payload.getSpaceID(), new PayloadFactory.AuthenticationFailurePayload("Error reading that space's user count."));
sourceChannel.sendMessage(failureMessage);
return;
}
- if(!"list".equals(element.getName())){
+ if (!"list".equals(element.getName())) {
Log.error("Could not get list of users for space user count for guest: " + payload.getSpaceID());
Message failureMessage = new Message(channelServer.getLocator(), request.getOrigin(), payload.getSpaceID(), new PayloadFactory.AuthenticationFailurePayload("Error reading that space's user list."));
sourceChannel.sendMessage(failureMessage);
return;
-
+
}
int userCount = element.getChildren(UserDocument.NAME).length;
- if(userCount >= spaceRecord.getMaxGuests()) {
+ if (userCount >= spaceRecord.getMaxGuests()) {
Log.error("Refused guest to space " + spaceRecord.getSpaceID() + " for reasons of max guest limit: " + spaceRecord.getMaxGuests());
Message failureMessage = new Message(channelServer.getLocator(), request.getOrigin(), payload.getSpaceID(), new PayloadFactory.AuthenticationFailurePayload("This space has reached its guest limit."));
sourceChannel.sendMessage(failureMessage);
return;
}
}
-
+
LocatorAuth locatorAuth = new LocatorAuth(payload.getSpaceID(), username, simRecord.getSimURI());
locatorAuths.put(request.getOrigin(), locatorAuth);
@@ -200,7 +200,7 @@
sourceChannel.sendMessage(message);
Message simMessage = new Message(channelServer.getLocator(), request.getOrigin(), payload.getSpaceID(), new PayloadFactory.AuthenticatedPayload(username));
- sendMessageToSpace(locatorAuth.uri,locatorAuth.spaceID, simMessage);
+ sendMessageToSpace(locatorAuth.uri, locatorAuth.spaceID, simMessage);
return;
} else if (request.getPayload() instanceof PayloadFactory.LogoutPayload) {
request.getPayload(); //XXX is this necessary?
@@ -219,12 +219,12 @@
event.setProperty(SpaceEvent.USERNAME, locatorAuth.username);
payload.setSpaceEvent(event);
- sendMessageToSpace(locatorAuth.uri,locatorAuth.spaceID, request);
+ sendMessageToSpace(locatorAuth.uri, locatorAuth.spaceID, request);
} else if (request.getPayload() instanceof PayloadFactory.HeartbeatPayload) {
- long now=System.currentTimeMillis();
- long diff = now-lastIncomingHeartbeat;
- lastIncomingHeartbeat=now;
- if (diff>HEARTBEAT_INTERVAL_TO_SIM) {
+ long now = System.currentTimeMillis();
+ long diff = now - lastSentSimHeartbeat;
+ if (diff > HEARTBEAT_INTERVAL_TO_SIM) {
+ lastSentSimHeartbeat = now;
heartbeatToSims();
}
} else {
@@ -242,15 +242,14 @@
return;
}
Message message = new Message(remoteLocator, getLocator(), locatorAuth.spaceID, new PayloadFactory.LoggedOutPayload(locatorAuth.username));
- sendMessageToSpace(locatorAuth.uri,locatorAuth.spaceID, message);
+ sendMessageToSpace(locatorAuth.uri, locatorAuth.spaceID, message);
}
private void heartbeatToSims() throws NoSuchDestinationException {
- //Log.info("Refreshing all sims...");
synchronized (simChannels) {
Object[] channels = (Object[]) simChannels.getValues();
- for (int i=0; i<channels.length;++i) {
- URI uri=(URI)simChannels.getBackward(channels[i]);
+ for (int i = 0; i < channels.length; ++i) {
+ URI uri = (URI) simChannels.getBackward(channels[i]);
Message message = new Message(channelServer.getLocator(), getLocator(), Sim.NO_SPACE_ID, new PayloadFactory.HeartbeatPayload());
sendMessageToSpace(uri, Sim.NO_SPACE_ID, message);
}
@@ -267,13 +266,13 @@
}
private void sendToUser(Message message) {
- final PayloadFactory.ProxiedSpaceEventPayload proxyPayload = (PayloadFactory.ProxiedSpaceEventPayload)message.getPayload();
+ final PayloadFactory.ProxiedSpaceEventPayload proxyPayload = (PayloadFactory.ProxiedSpaceEventPayload) message.getPayload();
Object[] spaceClientLocators = locatorAuths.getKeys(new TwoWayMap.Filter() {
public boolean matches(Object obj) {
return ((LocatorAuth) obj).username.equals(proxyPayload.getUsername());
}
});
-
+
PayloadFactory.SpaceEventPayload eventPayload = new PayloadFactory.SpaceEventPayload(proxyPayload.getSpaceEvent());
for (int i = 0; i < spaceClientLocators.length; i++) {
Locator remoteLocator = (Locator) spaceClientLocators[i];
@@ -281,30 +280,30 @@
try {
channelServer.sendMessage(individualMessage);
} catch (Throwable e) {
- Log.error("Could not send a message to client at " + remoteLocator,e);
+ Log.error("Could not send a message to client at " + remoteLocator, e);
}
}
}
-
+
//Distribute the sim message to all clients authed to that space
//TODO fix me: this is way way too expensive just to send a sim message
private void sendToAll(final Message message) {
Object[] auths = locatorAuths.getValues();
for (int i = 0; i < auths.length; i++) {
- LocatorAuth auth = (LocatorAuth)auths[i];
- if(auth.spaceID != message.getSpaceID()) {
- continue;
+ LocatorAuth auth = (LocatorAuth) auths[i];
+ if (auth.spaceID != message.getSpaceID()) {
+ continue;
}
Locator remoteLocator = (Locator) locatorAuths.getBackward(auth);
- if(remoteLocator == null) {
+ if (remoteLocator == null) {
continue;
}
Message individualMessage = new Message(getLocator(), remoteLocator, message.getSpaceID(), message.getPayload());
try {
channelServer.sendMessage(individualMessage);
} catch (Throwable e) {
- Log.error("Could not send a message to a client at " + remoteLocator,e);
+ Log.error("Could not send a message to a client at " + remoteLocator, e);
}
}
}
@@ -324,7 +323,7 @@
try {
/*Object selector=AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector();
AsyncProto proto=AsyncProtoFactory.getDefaultClient(uri.getHost(), selector);*/
- AsyncProto proto=AsyncProtoFactory.getDefaultClient(descriptor, false);
+ AsyncProto proto = AsyncProtoFactory.getDefaultClient(descriptor, false);
simChannel = new TCPChannel(proto, simMessageHandler, false, simMessageHandler, true, "sim-client");
} catch (IOException e) {
e.printStackTrace();
@@ -360,7 +359,7 @@
public LocatorAuth(long spaceID, String username, URI uri) {
this.spaceID = spaceID;
this.username = username;
- this.uri=uri;
+ this.uri = uri;
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|