|
From: <ian...@us...> - 2007-10-10 23:31:35
|
Revision: 490
http://ogoglio.svn.sourceforge.net/ogoglio/?rev=490&view=rev
Author: iansmith
Date: 2007-10-10 16:31:38 -0700 (Wed, 10 Oct 2007)
Log Message:
-----------
Refactored the existing socket code into package and can pass all the tests against the new abstract API (AsyncProto).
No functional effect, but this readies to switch to a new underlying protocol, Comet, real soon now.
If you are not running on the apache portable runtime the CometServlet won't do anything. Since it is not in the code path right now, it's not a functional change.
Modified Paths:
--------------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java
maven/trunk/ogoglio-server/pom.xml
maven/trunk/ogoglio-server/src/main/webapp/WEB-INF/web.xml
Added Paths:
-----------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/CometServlet.java
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java 2007-10-08 12:51:24 UTC (rev 489)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/NetworkChannelServer.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -19,6 +19,9 @@
import java.net.Socket;
import java.util.Vector;
+import com.ogoglio.message.proto.AsyncProto;
+import com.ogoglio.message.proto.AsyncProtoFactory;
+import com.ogoglio.message.proto.AsyncProtoServer;
import com.ogoglio.util.BlockingQueue;
import com.ogoglio.util.Log;
import com.ogoglio.util.NetworkUtils;
@@ -27,7 +30,7 @@
private Vector channels = new Vector();
- private ServerSocket serverSocket = null;
+ private AsyncProtoServer serverProto = null;
private ChannelSocketListenerThread listenerThread = new ChannelSocketListenerThread();
@@ -35,21 +38,13 @@
private MessageHandler messageHandler = null;
- private Listener listener = null;
-
private boolean ensureOrigin = false;
- public NetworkChannelServer(MessageHandler messageHandler, boolean ensureOrigin, Listener listener) {
- this(messageHandler, 0, ensureOrigin, listener);
- }
-
+ private Listener listener;
+
public NetworkChannelServer(MessageHandler messageHandler, int port, boolean ensureOrigin, Listener listener) {
try {
- if (port == -1) {
- serverSocket = new ServerSocket(0);
- } else {
- serverSocket = new ServerSocket(port);
- }
+ serverProto= AsyncProtoFactory.getDefaultServer(port);
} catch (IOException e) {
throw new IllegalStateException("Could not open a server socket: " + e);
}
@@ -58,13 +53,12 @@
if (listener == null) {
throw new IllegalArgumentException("bad listener " + listener);
}
- this.listener = listener;
+ this.listener=listener;
listenerThread.start();
}
public interface Listener {
public void channelAdded(TCPChannel channel);
-
public void channelRemoved(TCPChannel channel);
}
@@ -115,13 +109,13 @@
}
public Locator getLocator() {
- return new Locator(NetworkUtils.getLocalHostAddress(), serverSocket.getLocalPort());
+ return new Locator(NetworkUtils.getLocalHostAddress(), serverProto.getLocalPort());
}
public void cleanup() {
cleaned = true;
try {
- serverSocket.close();
+ serverProto.shutdown();
} catch (Exception e) {
Log.info("Trying to close server socket of NCServer",e);
// don't care
@@ -142,11 +136,11 @@
public void run() {
while (!cleaned) {
try {
- Socket clientSocket = serverSocket.accept();
- if (clientSocket == null) {
+ AsyncProto clientProto = serverProto.waitForClient();
+ if (clientProto == null) {
break;
}
- TCPChannel channel = new TCPChannel(clientSocket, messageHandler, ensureOrigin, NetworkChannelServer.this);
+ TCPChannel channel = new TCPChannel(clientProto, messageHandler, ensureOrigin, NetworkChannelServer.this);
addChannel(channel);
listener.channelAdded(channel);
} catch (IOException e) {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java 2007-10-08 12:51:24 UTC (rev 489)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -18,6 +18,7 @@
import java.io.OutputStream;
import java.net.Socket;
+import com.ogoglio.message.proto.AsyncProto;
import com.ogoglio.util.BlockingQueue;
import com.ogoglio.util.Log;
@@ -26,16 +27,15 @@
private BlockingQueue messageQueue = new BlockingQueue();
- private Socket clientSocket = null;
+ private AsyncProto clientProto;
- private OutputStream socketOutput = null;
-
private boolean cleaned = false;
- public SenderQueue(Socket clientSocket, int maxSize) {
+ public SenderQueue(AsyncProto clientProto, int maxSize) {
messageQueue.setMaxSize(maxSize);
+ this.clientProto = clientProto;
try {
- socketOutput = clientSocket.getOutputStream();
+ this.clientProto.prepareOutput();
} catch (IOException e) {
throw new IllegalStateException("Could not get socket output stream: " + e);
}
@@ -47,18 +47,9 @@
public void cleanup() {
cleaned = true;
- try {
- if (clientSocket!=null) {
- clientSocket.close();
- }
- if (socketOutput!=null) {
- socketOutput.close();
- }
- if (messageQueue!=null) {
- messageQueue.close();
- }
- } catch (IOException e) {
- Log.info("IOException trying cleanup SenderQueue",e);
+ clientProto.shutdown();
+ if (messageQueue != null) {
+ messageQueue.close();
}
}
@@ -70,8 +61,7 @@
String messageString = message.toString();
Command command = new Command(Command.MESSAGE, messageString.length());
try {
- socketOutput.write((command + "\n").getBytes());
- socketOutput.write(messageString.getBytes());
+ clientProto.sendMessage(command.toString(),messageString);
} catch (IOException e) {
if (!cleaned) {
e.printStackTrace();
@@ -92,7 +82,7 @@
Message message = (Message) messageQueue.dequeue();
unsafeSendMessage(message);
} catch (Throwable e) {
- Log.error("Could not send message",e);
+ Log.error("Could not send message", e);
break;
}
}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2007-10-08 12:51:24 UTC (rev 489)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -15,9 +15,9 @@
package com.ogoglio.message;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
+import com.ogoglio.message.proto.AsyncProto;
+import com.ogoglio.message.proto.AsyncProtoFactory;
import com.ogoglio.util.Log;
import com.ogoglio.util.NetworkUtils;
import com.ogoglio.util.BlockingQueue.QueueClosedException;
@@ -25,7 +25,7 @@
public class TCPChannel implements TCPMessageReader.Listener {
- private Socket clientSocket = null;
+ private AsyncProto clientProto = null;
private String remoteHostName = null;
@@ -44,13 +44,13 @@
private boolean ensureOrigin = false;
public TCPChannel(String remoteHost, int remotePort, MessageHandler messageHandler, boolean ensureOrigin, Listener listener) throws IOException {
- this(new Socket(remoteHost, remotePort), messageHandler, ensureOrigin, listener);
+ this(AsyncProtoFactory.getDefaultClient(remoteHost, remotePort), messageHandler, ensureOrigin, listener);
}
- public TCPChannel(Socket clientSocket, MessageHandler message_handler, boolean ensureOrigin, Listener listener) {
- this.clientSocket = clientSocket;
- remoteHostName = ((InetSocketAddress) clientSocket.getRemoteSocketAddress()).getAddress().getHostAddress();
- remoteHostPort = ((InetSocketAddress) clientSocket.getRemoteSocketAddress()).getPort();
+ public TCPChannel(AsyncProto proto, MessageHandler message_handler, boolean ensureOrigin, Listener listener) {
+ this.clientProto= proto;
+ remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
+ remoteHostPort = clientProto.getRemoteAddress().getPort();
if (message_handler == null) {
throw new IllegalArgumentException("bad message handler " + message_handler);
}
@@ -62,10 +62,10 @@
this.listener = listener;
// TODO Don't spand two threads for each socket! No effing way!
- senderQueue = new SenderQueue(clientSocket, 1000); //TODO what should the max queue size be?
+ senderQueue = new SenderQueue(clientProto, 1000); //TODO what should the max queue size be?
senderQueue.start();
- readerThread = new TCPMessageReader(clientSocket, message_handler, this);
+ readerThread = new TCPMessageReader(clientProto, message_handler, this);
readerThread.start();
}
@@ -75,7 +75,7 @@
}
public Locator getLocalLocator() {
- return new Locator(NetworkUtils.getLocalHostAddress(), clientSocket.getLocalPort());
+ return new Locator(NetworkUtils.getLocalHostAddress(), clientProto.getLocalPort());
}
public Locator getRemoteLocator() {
@@ -122,7 +122,7 @@
}
public String toString() {
- return "TCPChannel from " + NetworkUtils.getLocalHostAddress() + ":" + clientSocket.getLocalPort() + " to " + remoteHostName + ":" + remoteHostPort;
+ return "TCPChannel from " + NetworkUtils.getLocalHostAddress() + ":" + clientProto.getLocalPort() + " to " + remoteHostName + ":" + remoteHostPort;
}
public void socketClosed() {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java 2007-10-08 12:51:24 UTC (rev 489)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -15,30 +15,14 @@
package com.ogoglio.message;
import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
+import com.ogoglio.message.proto.AsyncProto;
import com.ogoglio.util.Log;
public class TCPMessageReader extends Thread {
- //TODO make this not suck ass
-
- public static int MAX_COMMAND_LENGTH = 2048; //TODO decide whether 2048 bytes is enough for a max command length
-
- public static int MAX_MESSAGE_LENGTH = 2048; //TODO decide whether 2048 bytes is enough for a max message length
-
private boolean cleaned = false;
- private boolean inCommandState = true;
-
- private StringBuffer commandBuffer = new StringBuffer();
-
- private StringBuffer messageBuffer = new StringBuffer();
-
- private InputStream socketInput = null;
-
private MessageHandler messageHandler = null;
private TCPChannel channel = null;
@@ -46,20 +30,17 @@
private String remoteHostName = null;
private int remotePort = -1;
+
+ private AsyncProto clientProto=null;
- public TCPMessageReader(Socket clientSocket, MessageHandler messageHandler, TCPChannel channel) {
+ public TCPMessageReader(AsyncProto clientProto, MessageHandler messageHandler, TCPChannel channel) {
super("TCPMessageReader");
setDaemon(true);
- if (clientSocket == null) {
- throw new IllegalArgumentException("bad socket " + clientSocket);
+ if (clientProto == null) {
+ throw new IllegalArgumentException("bad protocol to TCPMessageReader" + clientProto);
}
- try {
- socketInput = clientSocket.getInputStream();
- } catch (IOException e) {
- throw new IllegalStateException("Couldn't get client socket input stream " + e);
- }
- remoteHostName = ((InetSocketAddress) clientSocket.getRemoteSocketAddress()).getAddress().getHostAddress();
- remotePort = ((InetSocketAddress) clientSocket.getRemoteSocketAddress()).getPort();
+ remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
+ remotePort = clientProto.getRemoteAddress().getPort();
if (messageHandler == null) {
throw new IllegalArgumentException("bad message handler: " + messageHandler);
}
@@ -68,6 +49,14 @@
throw new IllegalArgumentException("bad listener " + channel);
}
this.channel = channel;
+ this.clientProto = clientProto;
+
+ try {
+ this.clientProto.prepareInput();
+ } catch (IOException e) {
+ throw new IllegalStateException("Couldn't get client socket input stream " + e);
+ }
+
}
public interface Listener {
@@ -76,65 +65,38 @@
public void cleanup() {
cleaned = true;
- try {
- if (socketInput!=null) {
- socketInput.close();
- }
- } catch (IOException e) {
- Log.info("IOException caught trying to clean up TCPMessageReader",e);
- }
+
+ clientProto.shutdown();
}
public void run() {
try {
Command command = new Command();
while (!cleaned) {
- int inInt = socketInput.read(); //TODO speed up message reading by using a receiving buffer
+ String commandLine = clientProto.readLine();
if (cleaned) {
- return;
+ return;//somebody shut us down during our wait
}
- if (inInt == -1) {
- channel.socketClosed();
+ if (commandLine==null) {
+ channel.socketClosed(); //other side went bye-bye
return;
}
- char inChar = (char) inInt;
- if (inCommandState) {
- if (commandBuffer.length() > MAX_COMMAND_LENGTH - 1) {
- throw new IllegalStateException("Command exceeds max length: " + commandBuffer);
- }
-
- if (inChar == '\r') { // this shouldn't happen, but people are silly
- continue;
- }
- if (inChar != '\n') {
- commandBuffer.append(inChar);
- } else {
- command.reset(commandBuffer.toString());
- commandBuffer.delete(0, commandBuffer.length());
- inCommandState = false;
- }
- } else {
- if (messageBuffer.length() > MAX_MESSAGE_LENGTH - 1) {
- throw new IllegalStateException("Message exceeds max length: " + messageBuffer);
- }
-
- messageBuffer.append(inChar);
- if (messageBuffer.length() == command.getMessageLength()) {
- Message message = Message.parseMessage(messageBuffer.toString());
- if (channel.ensureOrigin()) {
- message.getOrigin().setHost(remoteHostName);
- message.getOrigin().setPort(remotePort);
- }
- try {
- messageHandler.handleMessage(message, channel);
- } catch (Throwable e) {
- Log.error("Error handling message",e);
- e.printStackTrace();
- }
- messageBuffer.delete(0, messageBuffer.length());
- inCommandState = true;
- }
+ command.reset(commandLine);
+ if (!command.getType().equals(Command.MESSAGE)) {
+ throw new IllegalStateException("Whoa! Bad message type!"+command.getType());
}
+ String msg = clientProto.readString(command.getMessageLength());
+ Message message = Message.parseMessage(msg);
+ if (channel.ensureOrigin()) {
+ message.getOrigin().setHost(remoteHostName);
+ message.getOrigin().setPort(remotePort);
+ }
+ try {
+ messageHandler.handleMessage(message, channel);
+ } catch (Throwable e) {
+ Log.error("Error handling message",e);
+ e.printStackTrace();
+ }
}
} catch (Exception e) {
if (!cleaned) {
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,54 @@
+package com.ogoglio.message.proto;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * This class represents the notion of a protocol that can be sent over a socket. The
+ * simplest possible implementation is a socket, but it could be implemented on top of
+ * something else (in our case, COMET). Note that this is used by BOTH client and server
+ * once the two are connected.
+ *
+ * @author iansmith
+ *
+ */
+public interface AsyncProto {
+
+ /**
+ * The caller needs to be able to get at the endpoint to which this proto is speaking.
+ */
+ public InetSocketAddress getRemoteAddress();
+ /**
+ * The caller needs to get the port we are talking on.
+ */
+ public int getLocalPort();
+ /**
+ * Deal with various shutdown issues.
+ */
+ public void shutdown();
+ /**
+ * Pump out a message with a given command header.
+ */
+ public void sendMessage(String command, String message) throws IOException;
+ /**
+ * Read in a line terminated by CRLF. If the socket gets closed we return null. We return
+ * "" if the line was empty. IOException means something really unexpected happened.
+ */
+ public String readLine() throws IOException;
+ /**
+ * Read in a string that is known to be a given number of bytes. Again, we return null if
+ * the socket closes but "" for a zero length string. IOException means something bad happened.
+ */
+ public String readString(int length) throws IOException;
+ /**
+ * Insure that we are ready for writing to the output.
+ * @throws IOException
+ */
+ public void prepareOutput() throws IOException;
+ /**
+ * Insure that we are ready for reading from the input.
+ * @throws IOException
+ */
+ public void prepareInput() throws IOException;
+
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,35 @@
+package com.ogoglio.message.proto;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * This is a factory for other classes to get a handle on our async protocol handler and so
+ * we can "throw the switch" in exactly one place if we want to switch protocols.
+ *
+ * @author iansmith
+ *
+ */
+public class AsyncProtoFactory {
+
+ /**
+ * Get a handler for this protocol. This is the client side call.
+ *
+ * @param host
+ * @param port
+ * @return
+ * @throws IOException
+ */
+ public static AsyncProto getDefaultClient(String host, int port) throws IOException {
+ return new SimpleSocketAsync(host,port);
+ }
+ /**
+ * Become a serevr who waits for connections.
+ *
+ * If we are going to wait on a port, you pass it here. Note that you may not end up waiting
+ * on that port if the underlying protocol doesn't need to.
+ */
+ public static AsyncProtoServer getDefaultServer(int port) throws IOException {
+ return new SimpleSocketAsyncServer(port);
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoServer.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,20 @@
+package com.ogoglio.message.proto;
+
+import java.io.IOException;
+
+public interface AsyncProtoServer {
+ /**
+ * Wait for a client to show up and connect to us. If he does, return the object of the right type.
+ */
+ public AsyncProto waitForClient() throws IOException ;
+
+ /**
+ * Clean up
+ */
+ public void shutdown() throws IOException;
+
+ /**
+ * Get the local port, if there is one
+ */
+ public int getLocalPort();
+}
\ No newline at end of file
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,84 @@
+package com.ogoglio.message.proto;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.HttpURLConnection;
+import java.net.Socket;
+import java.net.URL;
+import java.net.URLConnection;
+
+import com.ogoglio.message.MessageHandler;
+import com.ogoglio.message.TCPChannel;
+import com.ogoglio.message.TCPChannel.Listener;
+
+public class CometClient {
+
+ //comet works over HTTP
+ private HttpURLConnection connection;
+
+ //most people just want to read/write to it
+ public OutputStream os;
+ public InputStream is;
+
+ private static final String CRLF="\r\n";
+ private static final String EOC = "0"+CRLF+CRLF; //end of chunk
+
+ public CometClient(String remoteHost, int remotePort, MessageHandler messageHandler, boolean ensureOrigin, Listener listener) throws IOException {
+ Socket socket=new Socket(remoteHost,remotePort);
+ is = socket.getInputStream();
+ os = socket.getOutputStream();
+
+ StringBuffer buff=new StringBuffer();
+ buff.append("POST /og/comet HTTP/1.1"+CRLF);
+ buff.append("content-type: text/plain"+CRLF);
+ buff.append("host: "+remoteHost+":"+remotePort+CRLF);
+ buff.append("connection: keep-alive"+CRLF);
+ buff.append("user-agent: ogoglio/viewer"+CRLF);
+ //if you omit this, the server generates the "end" immediately
+ buff.append("content-length: 17"+CRLF);
+ //buff.append("accept= text/plain"+CRLF);
+ buff.append("method: POST"+CRLF);
+ //buff.append("transfer-encoding= chunked"+CRLF);
+ buff.append(CRLF);
+ os.write(buff.toString().getBytes());
+ os.flush();
+
+ }
+
+
+ public static void main(String[] argv) {
+
+ try {
+ CometClient client=new CometClient("localhost",8080,null,false,null);
+ OutputStreamWriter wr=new OutputStreamWriter(client.os);
+ PrintWriter pr=new PrintWriter(wr);
+ StringBuffer data=new StringBuffer();
+ data.append(argv[0]+":"+argv[1]+"\n");
+
+ pr.print(data.toString());
+ pr.flush();
+
+
+ InputStreamReader isr=new InputStreamReader(client.is);
+ BufferedReader rd=new BufferedReader(isr);
+ String line=null;
+ do {
+ line=rd.readLine();
+ if (line!=null) {
+ System.out.println("<--:"+line);
+ }
+ } while (rd!=null);
+
+ } catch (IOException e) {
+ System.out.println("WHOA! IO EXCEPTION");
+ e.printStackTrace();
+ }
+ }
+
+
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,121 @@
+package com.ogoglio.message.proto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import com.ogoglio.util.Log;
+
+/**
+ * This is a simple impementation of a custom async proto that works on a standard TCP
+ * socket in the naive way.
+ *
+ * @author iansmith
+ *
+ */
+public class SimpleSocketAsync implements AsyncProto {
+
+ public static int MAX_COMMAND_LENGTH = 2048; //TODO decide whether 2048 bytes is enough for a max command length
+
+ public static int MAX_MESSAGE_LENGTH = 2048; //TODO decide whether 2048 bytes is enough for a max message length
+
+ private Socket socket = null;
+
+ private OutputStream socketOutput = null;
+
+ private InputStream socketInput = null;
+
+ private StringBuffer commandBuffer = new StringBuffer();
+
+ private StringBuffer messageBuffer = new StringBuffer();
+
+ public SimpleSocketAsync(String host, int port) throws IOException {
+ this(new Socket(host, port));
+ }
+
+ public SimpleSocketAsync(Socket socket) {
+ this.socket=socket;
+ }
+
+ public InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) socket.getRemoteSocketAddress();
+ }
+
+ public int getLocalPort() {
+ return socket.getLocalPort();
+ }
+
+ public void shutdown() {
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException e) {
+ Log.error("Unable to cleanup and shutdown SimpleSocketAsyncProto!", e);
+ }
+ }
+
+ public void sendMessage(String command, String message) throws IOException {
+ socketOutput.write((command + "\n").getBytes());
+ socketOutput.write(message.getBytes());
+
+ }
+
+ public String readLine() throws IOException {
+ boolean EOL = false;
+ do {
+ int inInt = socketInput.read(); //TODO speed up message reading by using a receiving buffer
+ if (inInt == -1) {
+ return null;
+ }
+ if (commandBuffer.length() > MAX_COMMAND_LENGTH - 1) {
+ throw new IllegalStateException("Command exceeds max length: " + commandBuffer);
+ }
+ char inChar = (char) inInt;
+ if (inChar == '\r') { // this shouldn't happen, but people are silly
+ continue;
+ }
+ if (inChar != '\n') {
+ commandBuffer.append(inChar);
+ } else {
+ EOL = true;
+ }
+ } while (!EOL);
+ String result = commandBuffer.toString();
+ commandBuffer.delete(0, commandBuffer.length());
+ return result;
+ }
+
+ public String readString(int length) throws IOException {
+ int ct = 0;
+ while (ct < length) {
+ int inInt = socketInput.read(); //TODO speed up message reading by using a receiving buffer
+ if (inInt == -1) {
+ return null;
+ }
+ if (messageBuffer.length() > MAX_MESSAGE_LENGTH - 1) {
+ throw new IllegalStateException("Message exceeds max length: " + messageBuffer);
+ }
+ char inChar = (char) inInt;
+ messageBuffer.append(inChar);
+ ++ct;
+ }
+ String result = messageBuffer.toString();
+ messageBuffer.delete(0, messageBuffer.length());
+ return result;
+ }
+
+ public void prepareOutput() throws IOException {
+ if (socketOutput == null) {
+ socketOutput = socket.getOutputStream();
+ }
+ }
+
+ public void prepareInput() throws IOException {
+ if (socketInput == null) {
+ socketInput = socket.getInputStream();
+ }
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsyncServer.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,37 @@
+package com.ogoglio.message.proto;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class SimpleSocketAsyncServer implements AsyncProtoServer {
+
+ private ServerSocket serverSocket;
+
+ public SimpleSocketAsyncServer(int port) throws IOException {
+ if (port == -1) {
+ serverSocket = new ServerSocket(0);
+ } else {
+ serverSocket = new ServerSocket(port);
+ }
+ }
+ /**
+ * Wait for a client to show up and connect to us. If he does, return the object of the right type.
+ */
+ public AsyncProto waitForClient() throws IOException {
+ Socket socket=serverSocket.accept();
+ return new SimpleSocketAsync(socket);
+ }
+ /**
+ * Clean up
+ */
+ public void shutdown() throws IOException {
+ serverSocket.close();
+ }
+ /**
+ * Get the local port, if there is one
+ */
+ public int getLocalPort() {
+ return serverSocket.getLocalPort();
+ }
+}
Modified: maven/trunk/ogoglio-server/pom.xml
===================================================================
--- maven/trunk/ogoglio-server/pom.xml 2007-10-08 12:51:24 UTC (rev 489)
+++ maven/trunk/ogoglio-server/pom.xml 2007-10-10 23:31:38 UTC (rev 490)
@@ -262,10 +262,16 @@
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
- <version>2.3</version>
+ <version>2.5</version>
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.tomcat</groupId>
+ <artifactId>catalina</artifactId>
+ <version>6.0.14</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>rhino</groupId>
<artifactId>js</artifactId>
<version>1.6R6</version>
Added: maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/CometServlet.java
===================================================================
--- maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/CometServlet.java (rev 0)
+++ maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/CometServlet.java 2007-10-10 23:31:38 UTC (rev 490)
@@ -0,0 +1,165 @@
+package com.ogoglio.site;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometEvent;
+import org.apache.catalina.CometProcessor;
+
+import com.ogoglio.util.Log;
+
+public class CometServlet extends HttpServlet implements CometProcessor {
+
+ protected ArrayList connections = new ArrayList();
+
+ protected MessageSender messageSender = null;
+
+ public void init() throws ServletException {
+ messageSender = new MessageSender();
+ Thread messageSenderThread = new Thread(messageSender, "MessageSender[" + getServletContext() + "]");
+ messageSenderThread.setDaemon(true);
+ messageSenderThread.start();
+ }
+
+ public void destroy() {
+ connections.clear();
+ messageSender.stop();
+ messageSender = null;
+ }
+
+ /**
+ * Process the given Comet event.
+ *
+ * @param event The Comet event that will be processed
+ * @throws IOException
+ * @throws ServletException
+ */
+ public void event(CometEvent event) throws IOException, ServletException {
+ HttpServletRequest request = event.getHttpServletRequest();
+ HttpServletResponse response = event.getHttpServletResponse();
+ if (event.getEventType() == CometEvent.EventType.BEGIN) {
+ //event.setTimeout(60000);
+ Log.info("Begin for session: " + request.getSession(true).getId());
+ PrintWriter writer = response.getWriter();
+ response.setContentType("text/plain");
+ writer.println("<!doctype html public \"-//w3c//dtd html 4.0 transitional//en\">");
+ writer.println("<head><title>JSP Chat</title></head><body bgcolor=\"#FFFFFF\">");
+ writer.flush();
+ synchronized (connections) {
+ connections.add(response);
+ }
+ //drainDataBuffer(event, request);
+ } else if (event.getEventType() == CometEvent.EventType.ERROR) {
+ Log.error("Error for session: " + request.getSession(true).getId()+"-->"+event.getEventSubType());
+ if (event.getEventSubType()==CometEvent.EventSubType.TIMEOUT) {
+ Log.info("Timeout ignored!");
+ return;
+ }
+ synchronized (connections) {
+ connections.remove(response);
+ }
+ event.close();
+ } else if (event.getEventType() == CometEvent.EventType.END) {
+ Log.info("End for session: " + request.getSession(true).getId());
+ synchronized (connections) {
+ connections.remove(response);
+ }
+ PrintWriter writer = response.getWriter();
+ writer.println("</body></html>");
+ event.close();
+ } else if (event.getEventType() == CometEvent.EventType.READ) {
+ drainDataBuffer(event, request);
+ }
+ }
+
+ private void drainDataBuffer(CometEvent event, HttpServletRequest request) throws IOException {
+ InputStream is = request.getInputStream();
+ byte[] buf = new byte[512];
+ do {
+ int n = is.read(buf); //can throw an IOException
+ if (n > 0) {
+ Log.info("Read " + n + " bytes: " + new String(buf, 0, n) + " for session: " + request.getSession(true).getId());
+ String msg=new String(buf);
+ String sender=msg.substring(0,msg.indexOf(':'));
+ String message=msg.substring(msg.indexOf(':')+1);
+ messageSender.send(sender, message);
+ } else if (n < 0) {
+ Log.error("Read error:"+event.getEventType()+"," +request.getRequestURI()+","+n);
+ return;
+ }
+ } while (is.available() > 0);
+ }
+
+ public class MessageSender implements Runnable {
+
+ protected boolean running = true;
+
+ protected ArrayList messages = new ArrayList();
+
+ public MessageSender() {
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * Add message for sending.
+ */
+ public void send(String user, String message) {
+ Log.info("FART:adding"+message +" from "+user);
+ synchronized (messages) {
+ messages.add("[" + user + "]: " + message);
+ messages.notify();
+ }
+ }
+
+ public void run() {
+
+ while (running) {
+
+ if (messages.size() == 0) {
+ try {
+ synchronized (messages) {
+ messages.wait();
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ synchronized (connections) {
+ String[] pendingMessages = null;
+ synchronized (messages) {
+ pendingMessages = (String[]) messages.toArray(new String[0]);
+ messages.clear();
+ }
+ // Send any pending message on all the open connections
+ Log.info("FART currently need to send on "+connections.size()+" connections with "+pendingMessages.length);
+ for (int i = 0; i < connections.size(); i++) {
+ try {
+ PrintWriter writer = ((HttpServletResponse) connections.get(i)).getWriter();
+ for (int j = 0; j < pendingMessages.length; j++) {
+ writer.println(pendingMessages[j] + "<br>");
+ }
+ writer.flush();
+ } catch (IOException e) {
+ Log.error("IOExeption sending message", e);
+ }
+ }
+ }
+
+ }
+
+ }
+
+ }
+
+}
Modified: maven/trunk/ogoglio-server/src/main/webapp/WEB-INF/web.xml
===================================================================
--- maven/trunk/ogoglio-server/src/main/webapp/WEB-INF/web.xml 2007-10-08 12:51:24 UTC (rev 489)
+++ maven/trunk/ogoglio-server/src/main/webapp/WEB-INF/web.xml 2007-10-10 23:31:38 UTC (rev 490)
@@ -30,6 +30,12 @@
<load-on-startup>1</load-on-startup>
</servlet>
+ <servlet>
+ <servlet-name>CometServlet</servlet-name>
+ <servlet-class>com.ogoglio.site.CometServlet</servlet-class>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
<servlet-mapping>
<servlet-name>AccountServlet</servlet-name>
<url-pattern>/account/*</url-pattern>
@@ -55,6 +61,11 @@
<url-pattern>/media/*</url-pattern>
</servlet-mapping>
+ <servlet-mapping>
+ <servlet-name>CometServlet</servlet-name>
+ <url-pattern>/comet/*</url-pattern>
+ </servlet-mapping>
+
<error-page>
<error-code>404</error-code>
<location>/notFound.html</location>
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|