|
From: <ian...@us...> - 2007-10-16 14:23:29
|
Revision: 503
http://ogoglio.svn.sourceforge.net/ogoglio/?rev=503&view=rev
Author: iansmith
Date: 2007-10-16 07:23:32 -0700 (Tue, 16 Oct 2007)
Log Message:
-----------
More comet prep and some reorg.
Modified Paths:
--------------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Command.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Message.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NoSuchDestinationException.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/Sim.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/sim/SimMessageHandler.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/SpaceServlet.java
maven/trunk/ogoglio-server/src/main/webapp/WEB-INF/web.xml
Added Paths:
-----------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/Locator.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncClientReady.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoShutdownHandle.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketAsyncServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketWaiterThread.java
Removed Paths:
-------------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Locator.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/CometServlet.java
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -39,6 +39,7 @@
import com.ogoglio.message.NoSuchDestinationException;
import com.ogoglio.message.PayloadFactory;
import com.ogoglio.message.TCPChannel;
+import com.ogoglio.message.proto.AsyncProtoFactory;
import com.ogoglio.util.ArgumentUtils;
import com.ogoglio.util.Log;
import com.ogoglio.util.WebConstants;
@@ -56,8 +57,6 @@
public class SpaceClient implements UserInputListener, Space.Context {
- public static int DEFAULT_EVENT_PORT = 43455;
-
private Listener listener = null;
private WebAPIClient webClient = null;
@@ -104,7 +103,9 @@
space = new Space(this, spaceDoc.getSpaceID(), spaceDoc.getDisplayName(), spaceDoc.getOwnerUsername(), spaceDoc.getDisplaySea(), spaceDoc.getSeaLevel());
//create the event channel and start queuing events
- messageChannel = new TCPChannel(serviceURI.getHost(), DEFAULT_EVENT_PORT, messenger, true, new ChannelListener());
+ Object selector = AsyncProtoFactory.getDefaultInfo().getProxySpecificSelector();
+ messageChannel = new TCPChannel(AsyncProtoFactory.getDefaultClient(serviceURI.getHost(), selector),
+ messenger, true, new ChannelListener());
messenger.authenticate(authCookie);
long startWait = System.currentTimeMillis();
@@ -406,7 +407,6 @@
if (!page.getContentType().equals(event.getStringProperty(SpaceEvent.CONTENT_TYPE))) {
page.setContentType(event.getStringProperty(SpaceEvent.CONTENT_TYPE));
}
- System.out.println("Page content type: " + page.getContentType());
page.reloadContent();
} else if (SpaceEvent.ADD_DOOR_EVENT.equals(event.getName())) {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Command.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Command.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Command.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -59,10 +59,10 @@
throw new IllegalStateException("Cannot have a message length < 0: " + commandString);
}
} catch (NumberFormatException e) {
- throw new IllegalArgumentException("Bad command string (bad length): " + commandString);
+ throw new IllegalArgumentException("Bad command string (bad length): >" + commandString+"<");
}
} else {
- throw new IllegalArgumentException("Bad command string: " + commandString);
+ throw new IllegalArgumentException("Bad command string: >" + commandString+"<");
}
}
Deleted: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Locator.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Locator.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Locator.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -1,98 +0,0 @@
-/* Copyright 2007 Transmutable (http://transmutable.com/)
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */
-
-package com.ogoglio.message;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-public class Locator {
-
- public static final String SCHEME = "og";
-
- private URI uri = null;
-
- public Locator(String locatorString) {
- try {
- uri = new URI(locatorString);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Bad locatorString: " + locatorString);
- }
- if (uri.getPort() <= 0) {
- throw new IllegalArgumentException("bad port: " + uri.getPort());
- }
- if (!SCHEME.equals(uri.getScheme())) {
- throw new IllegalArgumentException("bad scheme: " + uri.getScheme());
- }
- }
-
- public Locator(String host, int port) {
- if (port <= 0) {
- throw new IllegalArgumentException("bad port: " + port);
- }
- try {
- uri = new URI(SCHEME + "://" + host + ":" + port + "/");
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("bad args: " + host + ", " + port);
- }
- }
-
- public String getHost() {
- return uri.getHost();
- }
-
- public void setPort(int port) {
- String newURI = uri.getScheme() + "://" + uri.getHost() + ":" + port + "/";
- try {
- uri = new URI(newURI);
- } catch (URISyntaxException e) {
- throw new IllegalStateException("Bad uri: " + newURI);
- }
- }
-
- public void setHost(String host) {
- if (host == null || host.trim().length() == 0) {
- throw new IllegalArgumentException("Bad host name: " + host);
- }
- String newURI = uri.getScheme() + "://" + host + ":" + uri.getPort() + "/";
- try {
- uri = new URI(newURI);
- } catch (URISyntaxException e) {
- throw new IllegalStateException("Bad uri: " + newURI);
- }
- }
-
- public int getPort() {
- return uri.getPort();
- }
-
- public String toString() {
- return uri.toString();
- }
-
- public boolean equals(Object object) {
- if (object == null || !(object instanceof Locator)) {
- return false;
- }
- return toString().equals(((Locator) object).toString());
- }
-
- public int hashCode() {
- return toString().hashCode();
- }
-
- public boolean matchesHostAndPort(Locator locator) {
- return getHost().equals(locator.getHost()) && getPort() == locator.getPort();
- }
-}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,144 @@
+package com.ogoglio.message;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import com.ogoglio.message.proto.AsyncProto;
+import com.ogoglio.message.proto.AsyncProtoFactory;
+
+
+public class MeasPerfClient extends Thread implements MessageHandler, TCPChannel.Listener {
+
+ private static final double THRESHOLD_DANGER= 2.0; //make a note if it takes longer than this
+ private static final int MAX_SLEEP_TIME_MS= 5000; //avg 2.5secs
+ private static final int RAMP_UP_TIME= 250; //time btwn two thread creations
+ private static final int THRESHOLD_TERMINATE = 10; //we start culling senders if we see this many over THRESHOLD_DANGER
+
+ private TCPChannel channel;
+ private static Map timeMap=Collections.synchronizedMap(new HashMap());
+ private static Map senderMap=new HashMap();
+ private static List deathList=Collections.synchronizedList(new ArrayList());
+
+ private static double totalTime = 0.0;
+ private static double totalSamples=0.0;
+ private static Object totalTimeLock=new Object();
+
+ private static int warningCounter =0;
+ private static Object warningCounterLock = new Object();
+
+ private Random r=new Random();
+
+ public static void main(String[] argv) {
+ int n = Integer.parseInt(argv[0]);
+
+ try {
+ for (int i = 0; i < n; ++i) {
+ AsyncProto proto=AsyncProtoFactory.getDefaultClient(argv[1], AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector());
+ Thread t =new MeasPerfClient(proto);
+ t.start();
+ try {
+ Thread.sleep(RAMP_UP_TIME);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public MeasPerfClient(AsyncProto proto) {
+ channel = new TCPChannel(proto,this,false,this);
+ }
+
+ public void run() {
+ while (true) {
+ Payload p ;
+ Message m;
+
+ try {
+ Long me=new Long(Thread.currentThread().getId());
+ while (true) {
+ p=new PayloadFactory.HeartbeatPayload();
+ long id=r.nextLong();
+ m=new Message(channel.getLocalLocator(),channel.getRemoteLocator(),id,p);
+ timeMap.put(new Long(id),new Long(System.currentTimeMillis()));
+ senderMap.put(new Long(id),me);
+ channel.sendMessage(m);
+ int ms=r.nextInt(MAX_SLEEP_TIME_MS);
+ Thread.yield();
+ Thread.sleep(ms);
+ synchronized (deathList) {
+ if (deathList.contains(me)) {
+ channel.cleanup();
+ return;
+ }
+ }
+ //did somebody close this?
+ if (channel==null) {
+ return;
+ }
+ }
+ } catch (NoSuchDestinationException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void handleMessage(Message message, TCPChannel sourceChannel) throws NoSuchDestinationException {
+ Long id=new Long(message.getSpaceID());
+ long now=System.currentTimeMillis();
+ if (!timeMap.containsKey(id)) {
+ System.out.println("Can't find id in time map! Ignoring id:"+id);
+ return;
+ }
+ Long before=(Long)timeMap.get(id);
+ long diff = now-before.longValue();
+ double secs = ((double)diff)/1000.0;
+ synchronized (totalTimeLock) {
+ totalTime+=secs;
+ totalSamples+=1.0;
+ }
+ if (r.nextInt(100)==0) {
+ double avg;
+ synchronized (totalTimeLock) {
+ avg=totalTime/totalSamples;
+ }
+ System.out.println(""+ channel.getLocalLocator() +" Sample Turnaround time on heartbeat:"+secs+" and avg:"+avg);
+ }
+ if (secs>THRESHOLD_DANGER) {
+ boolean cull=false;
+
+ synchronized (warningCounterLock) {
+ warningCounter++;
+ if (warningCounter>THRESHOLD_TERMINATE) {
+ cull=true;
+ warningCounter=0;
+ }
+ }
+ //not enough to convince us to blow up somebody?
+ if (!cull) {
+ return;
+ }
+ //ok, the sender of the message should die if it took too long...
+ Long die = (Long)senderMap.get(id);
+ synchronized (deathList) {
+ if (!deathList.contains(die)) {
+ System.out.println("Killing sender "+die+" because of too much time elapsed:"+secs);
+ deathList.add(die);
+ }
+ }
+ }
+ }
+
+ public void channelClosed(TCPChannel channel) {
+ System.out.println("Got a message about closing channel:"+channel.hashCode()+ " vs. "+this.channel.hashCode());
+ }
+}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Message.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Message.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Message.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -16,6 +16,7 @@
import nanoxml.XMLElement;
+import com.ogoglio.message.proto.Locator;
import com.ogoglio.util.ArgumentUtils;
public class Message {
Deleted: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -1,182 +0,0 @@
-/* Copyright 2007 Transmutable (http://transmutable.com/)
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */
-
-package com.ogoglio.message;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Vector;
-
-import com.ogoglio.message.proto.AsyncProto;
-import com.ogoglio.message.proto.AsyncProtoFactory;
-import com.ogoglio.message.proto.AsyncProtoServer;
-import com.ogoglio.util.BlockingQueue;
-import com.ogoglio.util.Log;
-import com.ogoglio.util.NetworkUtils;
-
-public class NetworkChannelServer implements TCPChannel.Listener {
-
- private Vector channels = new Vector();
-
- private AsyncProtoServer serverProto = null;
-
- private ChannelSocketListenerThread listenerThread = new ChannelSocketListenerThread();
-
- private boolean cleaned = false;
-
- private MessageHandler messageHandler = null;
-
- private boolean ensureOrigin = false;
-
- private Listener listener;
-
- public NetworkChannelServer(MessageHandler messageHandler, int port, boolean ensureOrigin, Listener listener) {
- try {
- serverProto= AsyncProtoFactory.getDefaultServer(port);
- } catch (IOException e) {
- throw new IllegalStateException("Could not open a server socket: " + e);
- }
- this.messageHandler = messageHandler;
- this.ensureOrigin = ensureOrigin;
- if (listener == null) {
- throw new IllegalArgumentException("bad listener " + listener);
- }
- this.listener=listener;
- listenerThread.start();
- }
-
- public interface Listener {
- public void channelAdded(TCPChannel channel);
- public void channelRemoved(TCPChannel channel);
- }
-
- public void distribute(Payload payload, long spaceID) {
- TCPChannel[] channels = getChannels();
- for (int i = 0; i < channels.length; i++) {
- Message message = new Message(getLocator(), channels[i].getRemoteLocator(), spaceID, payload);
- try {
- channels[i].sendMessage(message);
- } catch (NoSuchDestinationException e) {
- e.printStackTrace();
- } catch (BlockingQueue.QueueClosedException e) {
- e.printStackTrace();
- }
- }
- }
-
- public void distributeExclusively(Payload payload, Locator excludedLocator) {
- TCPChannel[] channels = getChannels();
- for (int i = 0; i < channels.length; i++) {
- if (!channels[i].getRemoteLocator().matchesHostAndPort(excludedLocator)) {
- Message message = new Message(getLocator(), channels[i].getRemoteLocator(), -1, payload);
- try {
- channels[i].sendMessage(message);
- } catch (NoSuchDestinationException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- public synchronized void sendMessage(Message message) throws NoSuchDestinationException {
- Locator remoteLocator = message.getProxy() != null ? message.getProxy() : message.getDestination();
- TCPChannel[] channels = getChannels();
- for (int i = 0; i < channels.length; i++) {
- if (channels[i].getRemoteLocator().matchesHostAndPort(remoteLocator)) {
- channels[i].sendMessage(message);
- return;
- }
- }
-
- Log.info("Attempted location??: " + remoteLocator);
- //for (int i = 0; i < channels.length; i++) {
- // System.out.println("Available channels: " + channels[i].getRemoteLocator());
- //}
-
- throw new NoSuchDestinationException("Not local or for a connected channel: " + message);
- }
-
- public Locator getLocator() {
- return new Locator(NetworkUtils.getLocalHostAddress(), serverProto.getLocalPort());
- }
-
- public void cleanup() {
- cleaned = true;
- try {
- serverProto.shutdown();
- } catch (Exception e) {
- Log.info("Trying to close server socket of NCServer",e);
- // don't care
- }
- TCPChannel[] channels = getChannels();
- for (int i = 0; i < channels.length; i++) {
- channels[i].cleanup();
- }
- }
-
- private class ChannelSocketListenerThread extends Thread {
-
- public ChannelSocketListenerThread() {
- super("NetworkChannelServer");
- setDaemon(true);
- }
-
- public void run() {
- while (!cleaned) {
- try {
- AsyncProto clientProto = serverProto.waitForClient();
- if (clientProto == null) {
- break;
- }
- TCPChannel channel = new TCPChannel(clientProto, messageHandler, ensureOrigin, NetworkChannelServer.this);
- addChannel(channel);
- listener.channelAdded(channel);
- } catch (IOException e) {
- if (!cleaned) {
- e.printStackTrace();
- }
- break;
- }
- }
- if (!cleaned) {
- Log.error("Unclean client socket listener thread exiting.");
- }
- }
- }
-
- public synchronized void closeChannel(Locator remoteLocator) {
- TCPChannel[] channels = getChannels();
- for (int i = 0; i < channels.length; i++) {
- if (remoteLocator.equals(channels[i].getRemoteLocator())) {
- channels[i].cleanup();
- return;
- }
- }
- }
-
- private synchronized void addChannel(TCPChannel channel) {
- channels.add(channel);
- }
-
- public synchronized TCPChannel[] getChannels() {
- return (TCPChannel[]) channels.toArray(new TCPChannel[0]);
- }
-
- public synchronized void channelClosed(TCPChannel channel) {
- channels.remove(channel);
- listener.channelRemoved(channel);
- }
-
-}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NoSuchDestinationException.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NoSuchDestinationException.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NoSuchDestinationException.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -14,6 +14,8 @@
package com.ogoglio.message;
+import com.ogoglio.message.proto.Locator;
+
public class NoSuchDestinationException extends Exception {
public NoSuchDestinationException(Locator destination) {
super("No such destination: " + destination.toString());
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -18,6 +18,7 @@
import com.ogoglio.message.proto.AsyncProto;
import com.ogoglio.message.proto.AsyncProtoFactory;
+import com.ogoglio.message.proto.Locator;
import com.ogoglio.util.Log;
import com.ogoglio.util.NetworkUtils;
import com.ogoglio.util.BlockingQueue.QueueClosedException;
@@ -27,9 +28,9 @@
private AsyncProto clientProto = null;
- private String remoteHostName = null;
+// private String remoteHostName = null;
- private int remoteHostPort = -1;
+ // private int remoteHostPort = -1;
private TCPMessageReader readerThread = null;
@@ -43,14 +44,14 @@
private boolean ensureOrigin = false;
- public TCPChannel(String remoteHost, int remotePort, MessageHandler messageHandler, boolean ensureOrigin, Listener listener) throws IOException {
- this(AsyncProtoFactory.getDefaultClient(remoteHost, remotePort), messageHandler, ensureOrigin, listener);
+ public TCPChannel(String remoteHost, Object selector, MessageHandler messageHandler, boolean ensureOrigin, Listener listener) throws IOException {
+ this(AsyncProtoFactory.getDefaultClient(remoteHost, selector), messageHandler, ensureOrigin, listener);
}
public TCPChannel(AsyncProto proto, MessageHandler message_handler, boolean ensureOrigin, Listener listener) {
this.clientProto= proto;
- remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
- remoteHostPort = clientProto.getRemoteAddress().getPort();
+ //remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
+ //remoteHostPort = clientProto.getRemoteAddress().getPort();
if (message_handler == null) {
throw new IllegalArgumentException("bad message handler " + message_handler);
}
@@ -70,16 +71,20 @@
}
+
+ public void clientReady(AsyncProto newlyConnectedProto) {
+ Log.info("Client connected from: "+newlyConnectedProto.getRemoteLocator());
+ }
public interface Listener {
public void channelClosed(TCPChannel channel);
}
public Locator getLocalLocator() {
- return new Locator(NetworkUtils.getLocalHostAddress(), clientProto.getLocalPort());
+ return clientProto.getLocalLocator();
}
public Locator getRemoteLocator() {
- return new Locator(remoteHostName, remoteHostPort);
+ return clientProto.getRemoteLocator();
}
public void cleanup() {
@@ -103,26 +108,26 @@
public void sendMessage(Message message) throws NoSuchDestinationException {
if(message.getProxy() != null) {
- if(message.getProxy().getPort() != remoteHostPort || !message.getProxy().getHost().equals(remoteHostName)){
+ if (!message.getProxy().equals(clientProto.getRemoteLocator())) {
throw new NoSuchDestinationException("Passed a message to a TCPChannel with the wrong proxy: " + message.getProxy());
}
- } else if (message.getDestination().getPort() != remoteHostPort || !message.getDestination().getHost().equals(remoteHostName)){
+ } else if (!message.getDestination().equals(clientProto.getRemoteLocator())){
throw new NoSuchDestinationException("Passed a message to a TCPChannel with the wrong destination: " + message.getDestination());
}
try {
senderQueue.sendMessage(message);
} catch (QueueOverflowException e) {
- Log.error("Queue overflow: " + remoteHostName + ":" + remoteHostPort,e);
+ Log.error("Queue overflow: " + clientProto.getRemoteLocator(),e);
cleanup();
} catch (QueueClosedException e) {
- Log.error("Queue closed: " + remoteHostName + ":" + remoteHostPort,e);
+ Log.error("Queue closed: " + clientProto.getRemoteLocator(),e);
cleanup();
}
}
public String toString() {
- return "TCPChannel from " + NetworkUtils.getLocalHostAddress() + ":" + clientProto.getLocalPort() + " to " + remoteHostName + ":" + remoteHostPort;
+ return "TCPChannel from " + clientProto.getLocalLocator() + " to " +clientProto.getRemoteLocator();
}
public void socketClosed() {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -27,9 +27,9 @@
private TCPChannel channel = null;
- private String remoteHostName = null;
+ //private String remoteHostName = null;
- private int remotePort = -1;
+ //private int remotePort = -1;
private AsyncProto clientProto=null;
@@ -39,8 +39,8 @@
if (clientProto == null) {
throw new IllegalArgumentException("bad protocol to TCPMessageReader" + clientProto);
}
- remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
- remotePort = clientProto.getRemoteAddress().getPort();
+ //remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
+ //remotePort = clientProto.getRemoteAddress().getPort();
if (messageHandler == null) {
throw new IllegalArgumentException("bad message handler: " + messageHandler);
}
@@ -82,14 +82,12 @@
return;
}
command.reset(commandLine);
- if (!command.getType().equals(Command.MESSAGE)) {
- throw new IllegalStateException("Whoa! Bad message type!"+command.getType());
- }
String msg = clientProto.readString(command.getMessageLength());
Message message = Message.parseMessage(msg);
if (channel.ensureOrigin()) {
- message.getOrigin().setHost(remoteHostName);
- message.getOrigin().setPort(remotePort);
+ message.setOrigin(clientProto.getRemoteLocator());
+ //message.getOrigin().setHost(remoteHostName);
+ //message.getOrigin().setPort(remotePort);
}
try {
messageHandler.handleMessage(message, channel);
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -17,11 +17,11 @@
/**
* The caller needs to be able to get at the endpoint to which this proto is speaking.
*/
- public InetSocketAddress getRemoteAddress();
+ public Locator getRemoteLocator();
/**
- * The caller needs to get the port we are talking on.
+ * The caller needs to be able to get our own name.
*/
- public int getLocalPort();
+ public Locator getLocalLocator();
/**
* Deal with various shutdown issues.
*/
@@ -41,12 +41,14 @@
*/
public String readString(int length) throws IOException;
/**
- * Insure that we are ready for writing to the output.
+ * Insure that we are ready for writing to the output.It is required this be called before
+ * attempting to send.
* @throws IOException
*/
public void prepareOutput() throws IOException;
/**
- * Insure that we are ready for reading from the input.
+ * Insure that we are ready for reading from the input. It is required that this be called before
+ * attempting to read.
* @throws IOException
*/
public void prepareInput() throws IOException;
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -1,35 +1,32 @@
package com.ogoglio.message.proto;
import java.io.IOException;
-import java.net.ServerSocket;
/**
* This is a factory for other classes to get a handle on our async protocol handler and so
- * we can "throw the switch" in exactly one place if we want to switch protocols.
+ * we can "throw the switch" in exactly one place if we want to switch protocols.
*
+ * Clients call the getDefaultClient() routine and servers call the waitForClient() routine.
+ *
* @author iansmith
*
*/
public class AsyncProtoFactory {
/**
- * Get a handler for this protocol. This is the client side call.
+ * Get a handler for this protocol. This is the client side call. This fails if the connection
+ * cannot be made. It returns a connected proto object.
*
* @param host
- * @param port
+ * @param selector a proto-specific object
* @return
* @throws IOException
*/
- public static AsyncProto getDefaultClient(String host, int port) throws IOException {
- return new SimpleSocketAsync(host,port);
+ public static AsyncProto getDefaultClient(String host, Object selector) throws IOException {
+ return new SimpleSocketAsync(host, ((Integer)selector).intValue());
}
- /**
- * Become a serevr who waits for connections.
- *
- * If we are going to wait on a port, you pass it here. Note that you may not end up waiting
- * on that port if the underlying protocol doesn't need to.
- */
- public static AsyncProtoServer getDefaultServer(int port) throws IOException {
- return new SimpleSocketAsyncServer(port);
+
+ public static AsyncProtoInfo getDefaultInfo() {
+ return new SimpleSocketInfo();
}
}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,19 @@
+package com.ogoglio.message.proto;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * The intention of this interface is to hide various specifics about how the underlying async
+ * protocol is implemented. The objects returned here should be considered opaque.
+ *
+ * @author iansmith
+ *
+ */
+public interface AsyncProtoInfo {
+ public String getScheme();
+ public Object getProxySpecificSelector();
+ public Object getSimSpecificSelector();
+
+ public URI getURI(String host, Object selector) throws URISyntaxException;
+}
Deleted: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -1,20 +0,0 @@
-package com.ogoglio.message.proto;
-
-import java.io.IOException;
-
-public interface AsyncProtoServer {
- /**
- * Wait for a client to show up and connect to us. If he does, return the object of the right type.
- */
- public AsyncProto waitForClient() throws IOException ;
-
- /**
- * Clean up
- */
- public void shutdown() throws IOException;
-
- /**
- * Get the local port, if there is one
- */
- public int getLocalPort();
-}
\ No newline at end of file
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -9,11 +9,8 @@
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.Socket;
-import java.net.URL;
-import java.net.URLConnection;
import com.ogoglio.message.MessageHandler;
-import com.ogoglio.message.TCPChannel;
import com.ogoglio.message.TCPChannel.Listener;
public class CometClient {
Copied: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/Locator.java (from rev 483, maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/Locator.java)
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/Locator.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/Locator.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,90 @@
+/* Copyright 2007 Transmutable (http://transmutable.com/)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+
+package com.ogoglio.message.proto;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+
+public class Locator {
+
+ public static final AsyncProtoInfo info = AsyncProtoFactory.getDefaultInfo();
+
+ private URI uri = null;
+
+ public Locator(String locatorString) {
+ try {
+ uri = new URI(locatorString);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Bad locatorString: " + locatorString);
+ }
+ if (uri.getPort() <= 0) {
+ throw new IllegalArgumentException("bad port: " + uri.getPort());
+ }
+ if (!info.getScheme().equals(uri.getScheme())) {
+ throw new IllegalArgumentException("bad scheme: " + uri.getScheme());
+ }
+ }
+
+ public String getHost() {
+ return uri.getHost();
+ }
+
+ /*
+ public void setPort(int port) {
+ String newURI = uri.getScheme() + "://" + uri.getHost() + ":" + port + "/";
+ try {
+ uri = new URI(newURI);
+ } catch (URISyntaxException e) {
+ throw new IllegalStateException("Bad uri: " + newURI);
+ }
+ }
+
+ public void setHost(String host) {
+ if (host == null || host.trim().length() == 0) {
+ throw new IllegalArgumentException("Bad host name: " + host);
+ }
+ String newURI = uri.getScheme() + "://" + host + ":" + uri.getPort() + "/";
+ try {
+ uri = new URI(newURI);
+ } catch (URISyntaxException e) {
+ throw new IllegalStateException("Bad uri: " + newURI);
+ }
+
+ }
+ */
+ public int getPort() {
+ return uri.getPort();
+ }
+
+ public String toString() {
+ return uri.toString();
+ }
+
+ public boolean equals(Object object) {
+ if (object == null || !(object instanceof Locator)) {
+ return false;
+ }
+ return toString().equals(((Locator) object).toString());
+ }
+
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public boolean matchesHostAndPort(Locator locator) {
+ return getHost().equals(locator.getHost()) && getPort() == locator.getPort();
+ }
+}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -7,6 +7,7 @@
import java.net.Socket;
import com.ogoglio.util.Log;
+import com.ogoglio.util.NetworkUtils;
/**
* This is a simple impementation of a custom async proto that works on a standard TCP
@@ -39,14 +40,25 @@
this.socket=socket;
}
- public InetSocketAddress getRemoteAddress() {
- return (InetSocketAddress) socket.getRemoteSocketAddress();
+ public Locator getRemoteLocator() {
+ InetSocketAddress addr=(InetSocketAddress) socket.getRemoteSocketAddress();
+ return sockAddrToScheme(addr);
}
- public int getLocalPort() {
- return socket.getLocalPort();
+ public Locator getLocalLocator() {
+ InetSocketAddress addr=(InetSocketAddress) socket.getLocalSocketAddress();
+ return sockAddrToScheme(addr);
}
+ private Locator sockAddrToScheme(InetSocketAddress addr) {
+ String scheme = AsyncProtoFactory.getDefaultInfo().getScheme();
+ return new Locator(scheme +"://"+addr.getHostName()+":"+addr.getPort());
+ }
+
+ public Object getSelector() {
+ return new Integer(socket.getLocalPort());
+ }
+
public void shutdown() {
try {
if (socket != null) {
@@ -74,7 +86,7 @@
throw new IllegalStateException("Command exceeds max length: " + commandBuffer);
}
char inChar = (char) inInt;
- if (inChar == '\r') { // this shouldn't happen, but people are silly
+ if (inChar == '\r') {
continue;
}
if (inChar != '\n') {
Deleted: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java 2007-10-15 19:10:23 UTC (rev 502)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -1,37 +0,0 @@
-package com.ogoglio.message.proto;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-public class SimpleSocketAsyncServer implements AsyncProtoServer {
-
- private ServerSocket serverSocket;
-
- public SimpleSocketAsyncServer(int port) throws IOException {
- if (port == -1) {
- serverSocket = new ServerSocket(0);
- } else {
- serverSocket = new ServerSocket(port);
- }
- }
- /**
- * Wait for a client to show up and connect to us. If he does, return the object of the right type.
- */
- public AsyncProto waitForClient() throws IOException {
- Socket socket=serverSocket.accept();
- return new SimpleSocketAsync(socket);
- }
- /**
- * Clean up
- */
- public void shutdown() throws IOException {
- serverSocket.close();
- }
- /**
- * Get the local port, if there is one
- */
- public int getLocalPort() {
- return serverSocket.getLocalPort();
- }
-}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,30 @@
+package com.ogoglio.message.proto;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+
+public class SimpleSocketInfo implements AsyncProtoInfo {
+
+ private static final int DEFAULT_SIM_PORT=8932;
+ private static final int DEFAULT_PROXY_PORT=43092;
+
+ private static final String SCHEME = "og";
+
+ public Object getProxySpecificSelector() {
+ return new Integer(DEFAULT_PROXY_PORT);
+ }
+
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ public Object getSimSpecificSelector() {
+ return new Integer(DEFAULT_SIM_PORT);
+ }
+
+ public URI getURI(String host, Object selector) throws URISyntaxException {
+ return new URI(getScheme() + "://" + host + ":" + selector+ "/");
+ }
+
+}
Added: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncClientReady.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncClientReady.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncClientReady.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,18 @@
+package com.ogoglio.message.server;
+
+import com.ogoglio.message.proto.AsyncProto;
+
+/**
+ * Pass an instance of this to the waitForClient() of the factory and when a client connects,
+ * we will call back to you via this object.
+ *
+ * However, this *will* be called from a different thread than the caller so be sure to take
+ * care to synchronize as needed.
+ *
+ * @author iansmith
+ *
+ */
+public interface AsyncClientReady {
+
+ public void clientReady(AsyncProto newlyConnectedProto);
+}
Added: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,37 @@
+package com.ogoglio.message.server;
+
+import java.io.IOException;
+
+import com.ogoglio.message.proto.AsyncProtoInfo;
+import com.ogoglio.message.proto.SimpleSocketInfo;
+
+
+public class AsyncProtoServerFactory {
+ /**
+ * Become a server who waits for connections.
+ *
+ * If we are going to wait on a port, you pass it here. Note that you may not end up waiting
+ * on that port if the underlying protocol doesn't need to.
+ *
+ * This hands back a handle immediately, it does not block. The handle returned can be
+ * used when you want to not be a server anymore and want to clean up the resources.
+ */
+ public static AsyncProtoShutdownHandle waitForClient(Object serverSelector, AsyncClientReady ready) throws IOException {
+ return simpleSocketImpl(((Integer)serverSelector).intValue(),ready);
+ }
+
+ public static AsyncProtoInfo getInfo() {
+ return simpleSocketInfo();
+ }
+
+ private static AsyncProtoInfo simpleSocketInfo() {
+ return new SimpleSocketInfo();
+ }
+
+ private static AsyncProtoShutdownHandle simpleSocketImpl(int port, AsyncClientReady ready) throws IOException {
+ SimpleSocketWaiterThread waiter = new SimpleSocketWaiterThread(port,ready);
+ waiter.start();
+ return waiter.getShutdownHandle();
+ }
+
+}
Added: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoShutdownHandle.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoShutdownHandle.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoShutdownHandle.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,16 @@
+package com.ogoglio.message.server;
+
+import com.ogoglio.message.proto.Locator;
+
+public interface AsyncProtoShutdownHandle {
+
+ /*
+ * This allows you to see inside the object for info about what it's waiting on.
+ */
+ public Locator getLocalLocator();
+
+ /**
+ * This cleans up the resources behind the API if they are any that need it.
+ */
+ public void shutdown();
+}
Copied: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java (from rev 490, maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/CometServlet.java)
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,165 @@
+package com.ogoglio.message.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometEvent;
+import org.apache.catalina.CometProcessor;
+
+import com.ogoglio.util.Log;
+
+public class CometServlet extends HttpServlet implements CometProcessor {
+
+ protected ArrayList connections = new ArrayList();
+
+ protected MessageSender messageSender = null;
+
+ public void init() throws ServletException {
+ messageSender = new MessageSender();
+ Thread messageSenderThread = new Thread(messageSender, "MessageSender[" + getServletContext() + "]");
+ messageSenderThread.setDaemon(true);
+ messageSenderThread.start();
+ }
+
+ public void destroy() {
+ connections.clear();
+ messageSender.stop();
+ messageSender = null;
+ }
+
+ /**
+ * Process the given Comet event.
+ *
+ * @param event The Comet event that will be processed
+ * @throws IOException
+ * @throws ServletException
+ */
+ public void event(CometEvent event) throws IOException, ServletException {
+ HttpServletRequest request = event.getHttpServletRequest();
+ HttpServletResponse response = event.getHttpServletResponse();
+ if (event.getEventType() == CometEvent.EventType.BEGIN) {
+ //event.setTimeout(60000);
+ Log.info("Begin for session: " + request.getSession(true).getId());
+ PrintWriter writer = response.getWriter();
+ response.setContentType("text/plain");
+ writer.println("<!doctype html public \"-//w3c//dtd html 4.0 transitional//en\">");
+ writer.println("<head><title>JSP Chat</title></head><body bgcolor=\"#FFFFFF\">");
+ writer.flush();
+ synchronized (connections) {
+ connections.add(response);
+ }
+ //drainDataBuffer(event, request);
+ } else if (event.getEventType() == CometEvent.EventType.ERROR) {
+ Log.error("Error for session: " + request.getSession(true).getId()+"-->"+event.getEventSubType());
+ if (event.getEventSubType()==CometEvent.EventSubType.TIMEOUT) {
+ Log.info("Timeout ignored!");
+ return;
+ }
+ synchronized (connections) {
+ connections.remove(response);
+ }
+ event.close();
+ } else if (event.getEventType() == CometEvent.EventType.END) {
+ Log.info("End for session: " + request.getSession(true).getId());
+ synchronized (connections) {
+ connections.remove(response);
+ }
+ PrintWriter writer = response.getWriter();
+ writer.println("</body></html>");
+ event.close();
+ } else if (event.getEventType() == CometEvent.EventType.READ) {
+ drainDataBuffer(event, request);
+ }
+ }
+
+ private void drainDataBuffer(CometEvent event, HttpServletRequest request) throws IOException {
+ InputStream is = request.getInputStream();
+ byte[] buf = new byte[512];
+ do {
+ int n = is.read(buf); //can throw an IOException
+ if (n > 0) {
+ Log.info("Read " + n + " bytes: " + new String(buf, 0, n) + " for session: " + request.getSession(true).getId());
+ String msg=new String(buf);
+ String sender=msg.substring(0,msg.indexOf(':'));
+ String message=msg.substring(msg.indexOf(':')+1);
+ messageSender.send(sender, message);
+ } else if (n < 0) {
+ Log.error("Read error:"+event.getEventType()+"," +request.getRequestURI()+","+n);
+ return;
+ }
+ } while (is.available() > 0);
+ }
+
+ public class MessageSender implements Runnable {
+
+ protected boolean running = true;
+
+ protected ArrayList messages = new ArrayList();
+
+ public MessageSender() {
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * Add message for sending.
+ */
+ public void send(String user, String message) {
+ Log.info("FART:adding"+message +" from "+user);
+ synchronized (messages) {
+ messages.add("[" + user + "]: " + message);
+ messages.notify();
+ }
+ }
+
+ public void run() {
+
+ while (running) {
+
+ if (messages.size() == 0) {
+ try {
+ synchronized (messages) {
+ messages.wait();
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ synchronized (connections) {
+ String[] pendingMessages = null;
+ synchronized (messages) {
+ pendingMessages = (String[]) messages.toArray(new String[0]);
+ messages.clear();
+ }
+ // Send any pending message on all the open connections
+ Log.info("FART currently need to send on "+connections.size()+" connections with "+pendingMessages.length);
+ for (int i = 0; i < connections.size(); i++) {
+ try {
+ PrintWriter writer = ((HttpServletResponse) connections.get(i)).getWriter();
+ for (int j = 0; j < pendingMessages.length; j++) {
+ writer.println(pendingMessages[j] + "<br>");
+ }
+ writer.flush();
+ } catch (IOException e) {
+ Log.error("IOExeption sending message", e);
+ }
+ }
+ }
+
+ }
+
+ }
+
+ }
+
+}
Added: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,48 @@
+package com.ogoglio.message.server;
+
+import com.ogoglio.message.Message;
+import com.ogoglio.message.MessageHandler;
+import com.ogoglio.message.NoSuchDestinationException;
+import com.ogoglio.message.TCPChannel;
+import com.ogoglio.message.proto.AsyncProtoInfo;
+import com.ogoglio.message.server.NetworkChannelServer.Listener;
+
+
+public class MeasPerfServer implements MessageHandler, NetworkChannelServer.Listener {
+
+ /**
+ * Bootstrap to object's main
+ */
+ public static void main(String[] argv) {
+
+ new MeasPerfServer().main();
+
+ }
+
+ private NetworkChannelServer ncs;
+
+ public void main() {
+ try {
+ AsyncProtoInfo info = AsyncProtoServerFactory.getInfo();
+ ncs = new NetworkChannelServer(this,info.getSimSpecificSelector(),false,this);
+ while (true) {
+ System.out.println("Waiting for connections...");
+ Thread.sleep(30000);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void handleMessage(Message message, TCPChannel sourceChannel) throws NoSuchDestinationException {
+ ncs.distribute(message.getPayload(), message.getSpaceID());
+ }
+
+ public void channelAdded(TCPChannel channel) {
+ System.out.println("Added a TCP Channel:"+channel.getRemoteLocator()+" ["+ncs.getChannels().length+"]");
+ }
+
+ public void channelRemoved(TCPChannel channel) {
+ System.out.println("Removed a TCP Channel:"+channel.getRemoteLocator()+" ["+ncs.getChannels().length+"]");
+ }
+}
Copied: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java (from rev 490, maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java)
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java 2007-10-16 14:23:32 UTC (rev 503)
@@ -0,0 +1,155 @@
+/* Copyright 2007 Transmutable (http://transmutable.com/)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+
+package com.ogoglio.message.server;
+
+import java.io.IOException;
+import java.util.Vector;
+
+import com.ogoglio.message.Message;
+import com.ogoglio.message.MessageHandler;
+import com.ogoglio.message.NoSuchDestinationException;
+import com.ogoglio.message.Payload;
+import com.ogoglio.message.TCPChannel;
+import com.ogoglio.message.proto.AsyncProto;
+import com.ogoglio.message.proto.Locator;
+import com.ogoglio.util.BlockingQueue;
+import com.ogoglio.util.Log;
+import com.ogoglio.util.NetworkUtils;
+
+public class NetworkChannelServer implements TCPChannel.Listener, AsyncClientReady {
+
+ private Vector channels = new Vector();
+
+ private AsyncProtoShutdownHandle handle= null;
+
+ private MessageHandler messageHandler = null;
+
+ private boolean ensureOrigin = false;
+
+ private Listener listener;
+
+ public NetworkChannelServer(MessageHandler messageHandler, Object serverSelector, boolean ensureOrigin, Listener listener) {
+ try {
+ handle=AsyncProtoServerFactory.waitForClient(serverSelector, this);
+ } catch (IOException e) {
+ throw new IllegalStateException("Could not open a server socket: " + e);
+ }
+ this.messageHandler = messageHandler;
+ this.ensureOrigin = ensureOrigin;
+ if (listener == null) {
+ throw new IllegalArgumentException("bad listener " + listener);
+ }
+ this.listener=listener;
+ }
+
+ public interface Listener {
+ public void channelAdded(TCPChannel channel);
+ public void channelRemoved(TCPChannel channel);
+ }
+
+ public void distribute(Payload payload, long spaceID) {
+ TCPChannel[] channels = getChannels();
+ for (int i = 0; i < channels.length; i++) {
+ Message message = new Message(getLocator(), channels[i].getRem...
[truncated message content] |