|
From: <ian...@us...> - 2007-10-27 23:33:45
|
Revision: 543
http://ogoglio.svn.sourceforge.net/ogoglio/?rev=543&view=rev
Author: iansmith
Date: 2007-10-27 16:33:47 -0700 (Sat, 27 Oct 2007)
Log Message:
-----------
More comet support, currently turned off. No external effect.
Modified Paths:
--------------
maven/trunk/dev-plugins/src/main/java/com/ogoglio/plugin/VerifyEnvironmentMojo.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java
maven/trunk/ogoglio-server/pom.xml
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketAsyncServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java
Added Paths:
-----------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometInfo.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometProto.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometChannelManager.java
Modified: maven/trunk/dev-plugins/src/main/java/com/ogoglio/plugin/VerifyEnvironmentMojo.java
===================================================================
--- maven/trunk/dev-plugins/src/main/java/com/ogoglio/plugin/VerifyEnvironmentMojo.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/dev-plugins/src/main/java/com/ogoglio/plugin/VerifyEnvironmentMojo.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -8,7 +8,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import nanoxml.XMLElement;
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -57,6 +57,8 @@
public class SpaceClient implements UserInputListener, Space.Context {
+ private static final long START_UP_WAIT_MS = 15000;
+
private Listener listener = null;
private WebAPIClient webClient = null;
@@ -104,11 +106,11 @@
//create the event channel and start queuing events
Object selector = AsyncProtoFactory.getDefaultInfo().getProxySpecificSelector();
- messageChannel = new TCPChannel(AsyncProtoFactory.getDefaultClient(serviceURI.getHost(), selector), messenger, true, new ChannelListener());
+ messageChannel = new TCPChannel(AsyncProtoFactory.getDefaultClient(serviceURI.getHost(), selector), messenger, true, new ChannelListener(), true);
messenger.authenticate(authCookie);
long startWait = System.currentTimeMillis();
- while (System.currentTimeMillis() < startWait + 45000 && messenger.authStatus == messenger.INIT_STATUS) {
+ while (System.currentTimeMillis() < startWait + START_UP_WAIT_MS && messenger.authStatus == messenger.INIT_STATUS) {
try {
Thread.sleep(100);
} catch (Exception e) {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -38,7 +38,8 @@
try {
for (int i = 0; i < n; ++i) {
- AsyncProto proto=AsyncProtoFactory.getDefaultClient(argv[1], AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector());
+// AsyncProto proto=AsyncProtoFactory.getDefaultClient(argv[1], AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector());
+ AsyncProto proto=AsyncProtoFactory.getDefaultClient(argv[1], "/fart/poop");
Thread t =new MeasPerfClient(proto);
t.start();
try {
@@ -53,7 +54,8 @@
}
public MeasPerfClient(AsyncProto proto) {
- channel = new TCPChannel(proto,this,false,this);
+ System.out.println("FOUND A CLIENT PROTO:"+(proto!=null));
+ channel = new TCPChannel(proto,this,false,this,true);
}
public void run() {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -15,8 +15,6 @@
package com.ogoglio.message;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import com.ogoglio.message.proto.AsyncProto;
import com.ogoglio.util.BlockingQueue;
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -14,13 +14,9 @@
package com.ogoglio.message;
-import java.io.IOException;
-
import com.ogoglio.message.proto.AsyncProto;
-import com.ogoglio.message.proto.AsyncProtoFactory;
import com.ogoglio.message.proto.Locator;
import com.ogoglio.util.Log;
-import com.ogoglio.util.NetworkUtils;
import com.ogoglio.util.BlockingQueue.QueueClosedException;
import com.ogoglio.util.BlockingQueue.QueueOverflowException;
@@ -44,11 +40,8 @@
private boolean ensureOrigin = false;
- public TCPChannel(String remoteHost, Object selector, MessageHandler messageHandler, boolean ensureOrigin, Listener listener) throws IOException {
- this(AsyncProtoFactory.getDefaultClient(remoteHost, selector), messageHandler, ensureOrigin, listener);
- }
-
- public TCPChannel(AsyncProto proto, MessageHandler message_handler, boolean ensureOrigin, Listener listener) {
+ public TCPChannel(AsyncProto proto, MessageHandler message_handler, boolean ensureOrigin,
+ Listener listener, boolean needAReaderThread) {
this.clientProto= proto;
//remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
//remoteHostPort = clientProto.getRemoteAddress().getPort();
@@ -66,9 +59,16 @@
senderQueue = new SenderQueue(clientProto, 1000); //TODO what should the max queue size be?
senderQueue.start();
- readerThread = new TCPMessageReader(clientProto, message_handler, this);
- readerThread.start();
+ //this is a wee bit hairy. all clients need a reader thread. servers need a reader
+ //thread depending on their protocol.
+ Log.info("Comet got into a TCP channel:"+needAReaderThread);
+ if (needAReaderThread) {
+ readerThread = new TCPMessageReader(clientProto, message_handler, this);
+ readerThread.start();
+ }
+
+
}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -6,13 +6,12 @@
* This is a factory for other classes to get a handle on our async protocol handler and so
* we can "throw the switch" in exactly one place if we want to switch protocols.
*
- * Clients call the getDefaultClient() routine and servers call the waitForClient() routine.
- *
* @author iansmith
*
*/
public class AsyncProtoFactory {
-
+ //this is the magic switch
+ private static final boolean USE_SIMPLE_SOCKET=true;
/**
* Get a handler for this protocol. This is the client side call. This fails if the connection
* cannot be made. It returns a connected proto object.
@@ -23,10 +22,38 @@
* @throws IOException
*/
public static AsyncProto getDefaultClient(String host, Object selector) throws IOException {
+ if (USE_SIMPLE_SOCKET) {
+ return simpleSocketProto(host, selector);
+ } else {
+ return cometProto(host,(String)selector);
+ }
+ }
+
+ private static AsyncProto cometProto(String host, String selector) throws IOException {
+ return new CometClient(host,selector).getProto();
+ }
+
+ private static AsyncProto simpleSocketProto(String host, Object selector) throws IOException {
+ if (USE_SIMPLE_SOCKET) {
+ return simpleSocketInfo(host, selector);
+ } else {
+ return cometProto(host,(String)selector);
+ }
+ }
+
+ private static AsyncProtoInfo cometInfo() {
+ return new CometInfo();
+ }
+
+ private static AsyncProto simpleSocketInfo(String host, Object selector) throws IOException {
return new SimpleSocketAsync(host, ((Integer)selector).intValue());
}
public static AsyncProtoInfo getDefaultInfo() {
- return new SimpleSocketInfo();
+ if (USE_SIMPLE_SOCKET) {
+ return new SimpleSocketInfo();
+ } else {
+ return new CometInfo();
+ }
}
}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoInfo.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -1,7 +1,5 @@
package com.ogoglio.message.proto;
-import java.net.URI;
-import java.net.URISyntaxException;
/**
* The intention of this interface is to hide various specifics about how the underlying async
@@ -13,7 +11,5 @@
public interface AsyncProtoInfo {
public String getScheme();
public Object getProxySpecificSelector();
- public Object getSimSpecificSelector();
-
- public URI getURI(String host, Object selector) throws URISyntaxException;
+ public Object getSimSpecificSelector();
}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -7,7 +7,9 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.io.Writer;
import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
import java.net.Socket;
import com.ogoglio.message.MessageHandler;
@@ -25,13 +27,49 @@
private static final String CRLF="\r\n";
private static final String EOC = "0"+CRLF+CRLF; //end of chunk
+ public static final int XXX_PORT=8080;
+ public static final String XXX_MOUNT_POINT="/og/comet";
+
+ private Locator remote;
+ private Locator local;
+
+ public CometClient(String remoteHost, String selector) throws IOException {
+ Socket socket=new Socket(remoteHost,XXX_PORT); ///UGH
+ is = socket.getInputStream();
+ os = socket.getOutputStream();
+
+ StringBuffer buff=new StringBuffer();
+ buff.append("POST "+XXX_MOUNT_POINT+selector+" HTTP/1.1"+CRLF);
+ buff.append("content-type: text/plain"+CRLF);
+ buff.append("host: "+remoteHost+":"+XXX_PORT+CRLF);
+ buff.append("connection: keep-alive"+CRLF);
+ buff.append("user-agent: ogoglio/viewer"+CRLF);
+ //if you omit this, the server generates the "end" immediately
+ buff.append("content-length: 17"+CRLF);
+ //buff.append("accept= text/plain"+CRLF);
+ buff.append("method: POST"+CRLF);
+ //buff.append("transfer-encoding= chunked"+CRLF);
+ buff.append(CRLF);
+ os.write(buff.toString().getBytes());
+ os.flush();
+
+ String scheme = new CometInfo().getScheme();
+ remote = new Locator(scheme+"://"+remoteHost+":"+XXX_PORT+XXX_MOUNT_POINT+selector);
+ InetSocketAddress addr=(InetSocketAddress)socket.getLocalSocketAddress();
+ local = new Locator(scheme+"://UNKNOWN@"+addr.getHostName()+":"+addr.getPort()+"/");
+ }
+
+ public CometProto getProto() {
+ return new CometProto(new PrintWriter(os), is, remote, local);
+ }
+
public CometClient(String remoteHost, int remotePort, MessageHandler messageHandler, boolean ensureOrigin, Listener listener) throws IOException {
Socket socket=new Socket(remoteHost,remotePort);
is = socket.getInputStream();
os = socket.getOutputStream();
StringBuffer buff=new StringBuffer();
- buff.append("POST /og/comet HTTP/1.1"+CRLF);
+ buff.append("POST /og/comet/fart/poop HTTP/1.1"+CRLF);
buff.append("content-type: text/plain"+CRLF);
buff.append("host: "+remoteHost+":"+remotePort+CRLF);
buff.append("connection: keep-alive"+CRLF);
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometInfo.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometInfo.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometInfo.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -0,0 +1,21 @@
+package com.ogoglio.message.proto;
+
+
+public class CometInfo implements AsyncProtoInfo {
+
+ public CometInfo() {
+ }
+
+ public Object getProxySpecificSelector() {
+ return "/proxy";
+ }
+
+ public String getScheme() {
+ return "comet";
+ }
+
+ public Object getSimSpecificSelector() {
+ return "/sim";
+ }
+
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometProto.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometProto.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometProto.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -0,0 +1,88 @@
+package com.ogoglio.message.proto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+
+import com.ogoglio.util.Log;
+
+public class CometProto implements AsyncProto {
+ private Writer writer;
+ private InputStream input;
+ private Locator local;
+ private Locator remote;
+
+ private StringBuffer waiting=new StringBuffer();
+
+ public CometProto(Writer writer, InputStream input, Locator remote, Locator local) {
+ this.writer=writer;
+ this.input=input;
+ this.local=local;
+ this.remote=remote;
+ }
+
+ public Locator getLocalLocator() {
+ return local;
+ }
+
+ public Locator getRemoteLocator() {
+ return remote;
+ }
+
+ public void prepareInput() throws IOException {
+ //no effect in comet
+ }
+
+ public void prepareOutput() throws IOException {
+ //no effect in comment
+ }
+
+ public String readLine() throws IOException {
+ byte[] buf = new byte[512];
+ waiting.delete(0, waiting.length());
+ do {
+ int n = input.read(buf); //can throw an IOException
+ if (n > 0) {
+ waiting.append(new String(buf, 0, n));
+ } else if (n < 0) {
+ Log.error("Unable to read from stream! Error: read returned < 0:"+n);
+ throw new IOException("Can't read from stream [<0 in read()]");
+ }
+ } while (input.available() >0);
+
+ if (waiting.indexOf("\n")!=waiting.length()) {
+ Log.error("Did not get a single string terminated by a newline!");
+ }
+ return waiting.toString();
+ }
+
+ public String readString(int length) throws IOException {
+ byte[] buf = new byte[512];
+ waiting.delete(0, waiting.length());
+ do {
+ int n = input.read(buf); //can throw an IOException
+ if (n > 0) {
+ waiting.append(new String(buf,0,n));
+ } else if (n < 0) {
+ Log.error("Unable to read from stream! Error: read returned < 0:"+n);
+ throw new IOException("Can't read from stream [<0 in read()]");
+ }
+ } while (waiting.length()<length);
+ return waiting.toString();
+ }
+
+ public void sendMessage(String command, String message) throws IOException {
+ writer.write(command + "\n");
+ writer.write(message);
+ }
+
+ public void shutdown() {
+ try {
+ input.close();
+ writer.close();
+ } catch (IOException e) {
+ Log.error("Error closing input/output streams of comet!",e);
+ }
+ }
+
+}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketInfo.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -23,8 +23,4 @@
return new Integer(DEFAULT_SIM_PORT);
}
- public URI getURI(String host, Object selector) throws URISyntaxException {
- return new URI(getScheme() + "://" + host + ":" + selector+ "/");
- }
-
}
Modified: maven/trunk/ogoglio-server/pom.xml
===================================================================
--- maven/trunk/ogoglio-server/pom.xml 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/pom.xml 2007-10-27 23:33:47 UTC (rev 543)
@@ -196,6 +196,7 @@
<!-- these are for the populate -->
<configuration>
+
<serviceURI>${ogoglio.baseURL}</serviceURI>
<username>${ogoglio.bootstrapUser}</username>
<password>${ogoglio.bootstrapUserPW}</password>
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -1,12 +1,22 @@
package com.ogoglio.message.server;
import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import com.ogoglio.message.MessageHandler;
import com.ogoglio.message.proto.AsyncProtoInfo;
+import com.ogoglio.message.proto.CometInfo;
import com.ogoglio.message.proto.SimpleSocketInfo;
+import com.ogoglio.util.Log;
public class AsyncProtoServerFactory {
+ //this is the magic switch
+ private static final boolean USE_SIMPLE_SOCKET=true;
+ private static Map selectorToChannelManager = Collections.synchronizedMap(new HashMap());
+
/**
* Become a server who waits for connections.
*
@@ -17,11 +27,19 @@
* used when you want to not be a server anymore and want to clean up the resources.
*/
public static AsyncProtoShutdownHandle waitForClient(Object serverSelector, AsyncClientReady ready) throws IOException {
- return simpleSocketImpl(((Integer)serverSelector).intValue(),ready);
+ if (USE_SIMPLE_SOCKET) {
+ return simpleSocketImpl(((Integer)serverSelector).intValue(),ready);
+ } else {
+ return cometImpl(serverSelector, ready);
+ }
}
public static AsyncProtoInfo getInfo() {
- return simpleSocketInfo();
+ if (USE_SIMPLE_SOCKET) {
+ return simpleSocketInfo();
+ } else {
+ return cometInfo();
+ }
}
private static AsyncProtoInfo simpleSocketInfo() {
@@ -34,4 +52,33 @@
return waiter.getShutdownHandle();
}
+ private static AsyncProtoInfo cometInfo() {
+ return new CometInfo();
+ }
+
+ private static AsyncProtoShutdownHandle cometImpl(Object serverSelector, AsyncClientReady ready) {
+ String urlPart = (String)serverSelector;
+ CometChannelManager mgr = new CometChannelManager(urlPart,ready);
+ selectorToChannelManager.put(serverSelector, mgr);
+ return mgr;
+ }
+
+ public static boolean needsReaderThreadsOnServerSide() {
+ return USE_SIMPLE_SOCKET;
+ }
+
+ public static void registerMessageHandler(Object serverSelector, MessageHandler messageHandler) {
+ if (USE_SIMPLE_SOCKET) {
+ Log.error("Should NEVER call registerMessageHandler with simple socket proto!");
+ return;
+ }
+
+ if (!selectorToChannelManager.containsKey(serverSelector)) {
+ throw new IllegalArgumentException("Bad server selector provided to registerMessageHandler:"+serverSelector.getClass().getCanonicalName());
+ }
+
+ CometChannelManager mgr = (CometChannelManager) selectorToChannelManager.get(serverSelector);
+ mgr.setMessageHandler(messageHandler);
+ }
+
}
Added: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometChannelManager.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometChannelManager.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometChannelManager.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -0,0 +1,56 @@
+package com.ogoglio.message.server;
+
+import com.ogoglio.message.MessageHandler;
+import com.ogoglio.message.proto.CometClient;
+import com.ogoglio.message.proto.CometInfo;
+import com.ogoglio.message.proto.CometProto;
+import com.ogoglio.message.proto.Locator;
+import com.ogoglio.util.Log;
+
+public class CometChannelManager implements AsyncProtoShutdownHandle {
+
+ private AsyncClientReady ready;
+ private CometProto proto;
+ private StringBuffer buffer=new StringBuffer();
+ private String urlPart;
+ private MessageHandler messageHandler;
+
+ public CometChannelManager(String urlPart, AsyncClientReady ready) {
+ this.ready=ready;
+ this.urlPart=urlPart;
+ CometServlet.addChannel(urlPart, this);
+ }
+
+ public Locator getLocalLocator() {
+ if (proto==null) {
+ String XXX_localMachine = "localhost";
+ String scheme = new CometInfo().getScheme();
+ String cometURL=scheme+"://"+XXX_localMachine+":"+CometClient.XXX_PORT+CometClient.XXX_MOUNT_POINT+urlPart;
+ return new Locator(cometURL);
+ }
+ return proto.getLocalLocator();
+ }
+
+ public AsyncClientReady getReady() {
+ return ready;
+ }
+
+ public void setConn(CometProto proto) {
+ this.proto=proto;
+ }
+ public void shutdown() {
+ if (proto!=null) {
+ proto.shutdown();
+ }
+ }
+ public void addData(byte[] buf, int n) {
+ //ugh, stupid inefficiency
+ String tmp=new String(buf,0,n);
+ buffer.append(tmp);
+ Log.info("COMET: Got some data in the buffer:"+buffer);
+ }
+
+ public void setMessageHandler(MessageHandler messageHandler) {
+ this.messageHandler=messageHandler;
+ }
+}
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -2,8 +2,14 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -12,28 +18,51 @@
import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;
+import sun.reflect.ReflectionFactory.GetReflectionFactoryAction;
+import com.ogoglio.message.proto.CometProto;
+import com.ogoglio.message.proto.Locator;
import com.ogoglio.util.Log;
public class CometServlet extends HttpServlet implements CometProcessor {
- protected ArrayList connections = new ArrayList();
- protected MessageSender messageSender = null;
-
public void init() throws ServletException {
- messageSender = new MessageSender();
- Thread messageSenderThread = new Thread(messageSender, "MessageSender[" + getServletContext() + "]");
- messageSenderThread.setDaemon(true);
- messageSenderThread.start();
+ //TEMP
+ //MeasPerfServer measServer=new MeasPerfServer();
+ //NetworkChannelServer ncs=new NetworkChannelServer(measServer,"/fart/poop",false,measServer);
+ //measServer.channelServer(ncs);
}
public void destroy() {
- connections.clear();
- messageSender.stop();
- messageSender = null;
+ for (Iterator iterator = clientChannelMap.keySet().iterator(); iterator.hasNext();) {
+ String subspace = (String) iterator.next();
+ List mgrList = (List)clientChannelMap.get(subspace);
+ for (Iterator iterator2 = mgrList.iterator(); iterator2.hasNext();) {
+ CometChannelManager mgr = (CometChannelManager) iterator2.next();
+ mgr.shutdown();
+ }
+ }
}
+ private static Map clientChannelMap=Collections.synchronizedMap(new HashMap());
+ public static void addChannel(String subspace, CometChannelManager mgr) {
+ if (!clientChannelMap.containsKey(subspace)) {
+ clientChannelMap.put(subspace, Collections.synchronizedList(new ArrayList()));
+ Log.info("Comet listener on space:"+subspace);
+ }
+ List clienChannList = (List)clientChannelMap.get(subspace);
+ clienChannList.add(mgr);
+ }
+ public static void removeChannel(String subspace, CometChannelManager mgr) {
+ if (!clientChannelMap.containsKey(subspace)) {
+ Log.error("Tried to remove a channel for a subspace we don't have in CometServlet:"+subspace);
+ } else {
+ List clientChannList = (List)clientChannelMap.get(subspace);
+ clientChannList.remove(mgr);
+ }
+ }
+
/**
* Process the given Comet event.
*
@@ -47,119 +76,79 @@
if (event.getEventType() == CometEvent.EventType.BEGIN) {
//event.setTimeout(60000);
Log.info("Begin for session: " + request.getSession(true).getId());
- PrintWriter writer = response.getWriter();
- response.setContentType("text/plain");
- writer.println("<!doctype html public \"-//w3c//dtd html 4.0 transitional//en\">");
- writer.println("<head><title>JSP Chat</title></head><body bgcolor=\"#FFFFFF\">");
- writer.flush();
- synchronized (connections) {
- connections.add(response);
+ String path = request.getPathInfo();
+ if (clientChannelMap.containsKey(path)) {
+ List channList = (List)clientChannelMap.get(path);
+ for (Iterator iterator = channList.iterator(); iterator.hasNext();) {
+ CometChannelManager mgr= (CometChannelManager)iterator.next();
+ prepareComet(request, response, mgr);
+ }
+ } else {
+ Log.warn("Path "+path+" request for comet but nobody interested....");
+ Log.info("Paths of interest to somebody in comet:");
+ Iterator iterator=clientChannelMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ String p= (String ) iterator.next();
+ Log.info("Interested in comet path:"+p);
+ }
}
- //drainDataBuffer(event, request);
} else if (event.getEventType() == CometEvent.EventType.ERROR) {
Log.error("Error for session: " + request.getSession(true).getId()+"-->"+event.getEventSubType());
if (event.getEventSubType()==CometEvent.EventSubType.TIMEOUT) {
Log.info("Timeout ignored!");
return;
}
- synchronized (connections) {
- connections.remove(response);
- }
event.close();
} else if (event.getEventType() == CometEvent.EventType.END) {
Log.info("End for session: " + request.getSession(true).getId());
- synchronized (connections) {
- connections.remove(response);
- }
- PrintWriter writer = response.getWriter();
- writer.println("</body></html>");
event.close();
} else if (event.getEventType() == CometEvent.EventType.READ) {
+ Log.error("Comet: READ ready: "+request.getPathInfo());
drainDataBuffer(event, request);
}
}
+ private void prepareComet(HttpServletRequest request, HttpServletResponse response, CometChannelManager mgr) throws IOException {
+ String localString = AsyncProtoServerFactory.getInfo().getScheme() +"://" +
+ request.getServerName() + ":" +
+ request.getServerPort()+
+ request.getContextPath()+
+ request.getServletPath()+
+ request.getPathInfo();
+ String user;
+ if (request.getRemoteUser()==null) {
+ user="UNKNOWN";
+ } else {
+ user =request.getRemoteUser();
+ }
+ String remoteString = AsyncProtoServerFactory.getInfo().getScheme() +"://" +
+ user+"@"+
+ request.getRemoteHost() + ":" +
+ request.getRemotePort()+"/";
+ Locator local=new Locator(localString);
+ Locator remote=new Locator(remoteString);
+ CometProto proto=new CometProto(response.getWriter(),request.getInputStream(), remote, local);
+ AsyncClientReady ready=mgr.getReady();
+ mgr.setConn(proto);
+ ready.clientReady(proto);
+ }
+
private void drainDataBuffer(CometEvent event, HttpServletRequest request) throws IOException {
InputStream is = request.getInputStream();
byte[] buf = new byte[512];
do {
+ List mgrList = (List)clientChannelMap.get(request.getPathInfo());
int n = is.read(buf); //can throw an IOException
if (n > 0) {
- Log.info("Read " + n + " bytes: " + new String(buf, 0, n) + " for session: " + request.getSession(true).getId());
- String msg=new String(buf);
- String sender=msg.substring(0,msg.indexOf(':'));
- String message=msg.substring(msg.indexOf(':')+1);
- messageSender.send(sender, message);
+ Log.info("Read " + n + " bytes for session: " + request.getSession(true).getId());
+ for (Iterator iterator = mgrList.iterator(); iterator.hasNext();) {
+ CometChannelManager mgr = (CometChannelManager) iterator.next();
+ mgr.addData(buf,n);
+ }
} else if (n < 0) {
Log.error("Read error:"+event.getEventType()+"," +request.getRequestURI()+","+n);
return;
}
} while (is.available() > 0);
}
-
- public class MessageSender implements Runnable {
-
- protected boolean running = true;
-
- protected ArrayList messages = new ArrayList();
-
- public MessageSender() {
- }
-
- public void stop() {
- running = false;
- }
-
- /**
- * Add message for sending.
- */
- public void send(String user, String message) {
- Log.info("FART:adding"+message +" from "+user);
- synchronized (messages) {
- messages.add("[" + user + "]: " + message);
- messages.notify();
- }
- }
-
- public void run() {
-
- while (running) {
-
- if (messages.size() == 0) {
- try {
- synchronized (messages) {
- messages.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- }
-
- synchronized (connections) {
- String[] pendingMessages = null;
- synchronized (messages) {
- pendingMessages = (String[]) messages.toArray(new String[0]);
- messages.clear();
- }
- // Send any pending message on all the open connections
- Log.info("FART currently need to send on "+connections.size()+" connections with "+pendingMessages.length);
- for (int i = 0; i < connections.size(); i++) {
- try {
- PrintWriter writer = ((HttpServletResponse) connections.get(i)).getWriter();
- for (int j = 0; j < pendingMessages.length; j++) {
- writer.println(pendingMessages[j] + "<br>");
- }
- writer.flush();
- } catch (IOException e) {
- Log.error("IOExeption sending message", e);
- }
- }
- }
-
- }
-
- }
-
- }
-
}
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/MeasPerfServer.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -5,7 +5,6 @@
import com.ogoglio.message.NoSuchDestinationException;
import com.ogoglio.message.TCPChannel;
import com.ogoglio.message.proto.AsyncProtoInfo;
-import com.ogoglio.message.server.NetworkChannelServer.Listener;
public class MeasPerfServer implements MessageHandler, NetworkChannelServer.Listener {
@@ -39,10 +38,14 @@
}
public void channelAdded(TCPChannel channel) {
- System.out.println("Added a TCP Channel:"+channel.getRemoteLocator()+" ["+ncs.getChannels().length+"]");
+ System.out.println("Added a TCP Channel:"+channel.getRemoteLocator() + " [of "+ncs.getChannels().length+"]");
}
public void channelRemoved(TCPChannel channel) {
- System.out.println("Removed a TCP Channel:"+channel.getRemoteLocator()+" ["+ncs.getChannels().length+"]");
+ System.out.println("Removed a TCP Channel:"+channel.getRemoteLocator() + " [of "+ncs.getChannels().length+"]");
}
+
+ public void channelServer(NetworkChannelServer server) {
+ ncs=server;
+ }
}
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -26,7 +26,6 @@
import com.ogoglio.message.proto.Locator;
import com.ogoglio.util.BlockingQueue;
import com.ogoglio.util.Log;
-import com.ogoglio.util.NetworkUtils;
public class NetworkChannelServer implements TCPChannel.Listener, AsyncClientReady {
@@ -40,14 +39,18 @@
private Listener listener;
+ private Object serverSelector;
+
public NetworkChannelServer(MessageHandler messageHandler, Object serverSelector, boolean ensureOrigin, Listener listener) {
try {
+ Log.info("Network channel server started up with comet selector:"+serverSelector);
handle=AsyncProtoServerFactory.waitForClient(serverSelector, this);
} catch (IOException e) {
throw new IllegalStateException("Could not open a server socket: " + e);
}
this.messageHandler = messageHandler;
this.ensureOrigin = ensureOrigin;
+ this.serverSelector = serverSelector;
if (listener == null) {
throw new IllegalArgumentException("bad listener " + listener);
}
@@ -124,8 +127,15 @@
}
public void clientReady(AsyncProto clientProto) {
- TCPChannel channel = new TCPChannel(clientProto, messageHandler, ensureOrigin, NetworkChannelServer.this);
+ boolean spawnThread = AsyncProtoServerFactory.needsReaderThreadsOnServerSide();
+ TCPChannel channel = new TCPChannel(clientProto, messageHandler, ensureOrigin, NetworkChannelServer.this,
+ spawnThread);
addChannel(channel);
+ if (!spawnThread) {
+ //if the protocol needed us to spawn a thread, we would have done so inside the
+ //tcp channel creation above. if not, register the handler here
+ AsyncProtoServerFactory.registerMessageHandler(serverSelector,messageHandler);
+ }
listener.channelAdded(channel);
}
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketAsyncServer.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketAsyncServer.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketAsyncServer.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -9,7 +9,6 @@
import com.ogoglio.message.proto.Locator;
import com.ogoglio.message.proto.SimpleSocketAsync;
import com.ogoglio.util.Log;
-import com.ogoglio.util.NetworkUtils;
/*
* This is never visible to the outside world, it just makes it simpler to understand how
Modified: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java 2007-10-22 23:58:51 UTC (rev 542)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java 2007-10-27 23:33:47 UTC (rev 543)
@@ -298,7 +298,7 @@
try {
Object selector=AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector();
AsyncProto proto=AsyncProtoFactory.getDefaultClient(uri.getHost(), selector);
- simChannel = new TCPChannel(proto, simMessageHandler, false, simMessageHandler);
+ simChannel = new TCPChannel(proto, simMessageHandler, false, simMessageHandler, true);
} catch (IOException e) {
e.printStackTrace();
throw new NoSuchDestinationException("Could not open a channel to " + uri);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|