|
From: <tre...@us...> - 2008-03-11 23:18:50
|
Revision: 789
http://ogoglio.svn.sourceforge.net/ogoglio/?rev=789&view=rev
Author: trevorolio
Date: 2008-03-11 16:18:53 -0700 (Tue, 11 Mar 2008)
Log Message:
-----------
Pleased as punch to check in the first contribution from Matt Kimmel at the Electric Sheep Company (http://www.electricsheepcompany.com/)
Matt's introduced a way to plug in new message codecs and wire protocols for world events. Not all scenarios work well with XML over Comet, so being able to negotiate between different protocols depending on firewalls and applications will be a Good Thing.
Thanks, Matt!
Modified Paths:
--------------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProto.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/AsyncProtoFactory.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometClient.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/CometProto.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/Locator.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/proto/SimpleSocketAsync.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncClientReady.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoServerFactory.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/AsyncProtoShutdownHandle.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometChannelManager.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/CometServlet.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/NetworkChannelServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketAsyncServer.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/message/server/SimpleSocketWaiterThread.java
maven/trunk/ogoglio-server/src/main/java/com/ogoglio/site/MessageProxy.java
Added Paths:
-----------
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/ChunkedStreamNetworkTransporter.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageEncoder.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStack.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackFactory.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackType.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessagingException.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/NetworkTransporter.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/NullWireFormatEncoder.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/StreamNetworkTransporter.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/WireFormatEncoder.java
maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/XMLMessageEncoder.java
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/test/
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/test/ChunkedStreamNetworkTransporterTest.java
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/test/MessageStackTest.java
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/test/NullWireFormatEncoderTest.java
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/test/StreamNetworkTransporterTest.java
maven/trunk/ogoglio-common/src/test/java/com/ogoglio/message/plugin/test/XMLMessageEncoderTest.java
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java 2008-03-11 17:41:45 UTC (rev 788)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/client/SpaceClient.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -13,6 +13,17 @@
limitations under the License. */
package com.ogoglio.client;
+import com.ogoglio.client.model.*;
+import com.ogoglio.message.*;
+import com.ogoglio.message.plugin.MessageStackFactory;
+import com.ogoglio.util.ArgumentUtils;
+import com.ogoglio.util.Log;
+import com.ogoglio.util.WebConstants;
+import com.ogoglio.xml.*;
+
+import javax.media.j3d.Transform3D;
+import javax.vecmath.Color3f;
+import javax.vecmath.Point3d;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -22,45 +33,6 @@
import java.util.Vector;
import java.util.zip.ZipInputStream;
-import javax.media.j3d.Transform3D;
-import javax.vecmath.Color3f;
-import javax.vecmath.Point3d;
-
-import com.ogoglio.client.model.Attachment;
-import com.ogoglio.client.model.BodyConfiguration;
-import com.ogoglio.client.model.BodyDataProvider;
-import com.ogoglio.client.model.Door;
-import com.ogoglio.client.model.Page;
-import com.ogoglio.client.model.Shape;
-import com.ogoglio.client.model.Space;
-import com.ogoglio.client.model.SplinePath;
-import com.ogoglio.client.model.Template;
-import com.ogoglio.client.model.TemplateDataProvider;
-import com.ogoglio.client.model.Thing;
-import com.ogoglio.client.model.User;
-import com.ogoglio.message.Message;
-import com.ogoglio.message.MessageHandler;
-import com.ogoglio.message.NoSuchDestinationException;
-import com.ogoglio.message.PayloadFactory;
-import com.ogoglio.message.TCPChannel;
-import com.ogoglio.message.proto.AsyncProtoFactory;
-import com.ogoglio.util.ArgumentUtils;
-import com.ogoglio.util.Log;
-import com.ogoglio.util.WebConstants;
-import com.ogoglio.xml.AccountDocument;
-import com.ogoglio.xml.AttachmentDocument;
-import com.ogoglio.xml.BodyConfigurationDocument;
-import com.ogoglio.xml.BodyDataDocument;
-import com.ogoglio.xml.BodySettingDocument;
-import com.ogoglio.xml.DoorDocument;
-import com.ogoglio.xml.PageDocument;
-import com.ogoglio.xml.ShapeDocument;
-import com.ogoglio.xml.SpaceDocument;
-import com.ogoglio.xml.SpaceEvent;
-import com.ogoglio.xml.TemplateDocument;
-import com.ogoglio.xml.ThingDocument;
-import com.ogoglio.xml.UserDocument;
-
public class SpaceClient implements UserInputListener, Space.Context {
private static final long START_UP_WAIT_MS = 30000;
@@ -117,7 +89,7 @@
space = new Space(this, spaceDoc.getSpaceID(), spaceDoc.getDisplayName(), spaceDoc.getOwnerUsername(), spaceDoc.getDisplaySea(), spaceDoc.getSeaLevel(), backgroundColor);
//create the event channel and start queuing events
- messageChannel = new TCPChannel(AsyncProtoFactory.getDefaultClient(descriptor, true), messenger, true, new ChannelListener(), true, "space-client");
+ messageChannel = new TCPChannel(MessageStackFactory.getDefaultStack(descriptor, true), messenger, true, new ChannelListener(), true, "space-client");
messenger.authenticate(authCookie);
long startWait = System.currentTimeMillis();
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java 2008-03-11 17:41:45 UTC (rev 788)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/MeasPerfClient.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -1,46 +1,48 @@
package com.ogoglio.message;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import com.ogoglio.message.plugin.MessageStack;
-import com.ogoglio.message.proto.AsyncProto;
-import com.ogoglio.message.proto.AsyncProtoFactory;
+import java.util.*;
-
public class MeasPerfClient extends Thread implements MessageHandler, TCPChannel.Listener {
- private static final double THRESHOLD_DANGER= 2.0; //make a note if it takes longer than this
- private static final int MAX_SLEEP_TIME_MS= 5000; //avg 2.5secs
- private static final int RAMP_UP_TIME= 250; //time btwn two thread creations
+ private static final double THRESHOLD_DANGER = 2.0; //make a note if it takes longer than this
+
+ private static final int MAX_SLEEP_TIME_MS = 5000; //avg 2.5secs
+
+ private static final int RAMP_UP_TIME = 250; //time btwn two thread creations
+
private static final int THRESHOLD_TERMINATE = 10; //we start culling senders if we see this many over THRESHOLD_DANGER
-
+
private TCPChannel channel;
- private static Map timeMap=Collections.synchronizedMap(new HashMap());
- private static Map senderMap=new HashMap();
- private static List deathList=Collections.synchronizedList(new ArrayList());
+ private static Map timeMap = Collections.synchronizedMap(new HashMap());
+
+ private static Map senderMap = new HashMap();
+
+ private static List deathList = Collections.synchronizedList(new ArrayList());
+
private static double totalTime = 0.0;
- private static double totalSamples=0.0;
- private static Object totalTimeLock=new Object();
- private static int warningCounter =0;
+ private static double totalSamples = 0.0;
+
+ private static Object totalTimeLock = new Object();
+
+ private static int warningCounter = 0;
+
private static Object warningCounterLock = new Object();
-
- private Random r=new Random();
-
+
+ private Random r = new Random();
+
public static void main(String[] argv) {
int n = Integer.parseInt(argv[0]);
try {
for (int i = 0; i < n; ++i) {
-// AsyncProto proto=AsyncProtoFactory.getDefaultClient(argv[1], AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector());
- AsyncProto proto=null;/*AsyncProtoFactory.getDefaultClient(argv[1], "/fart/poop");*/
- Thread t =new MeasPerfClient(proto);
+ // AsyncProto proto=AsyncProtoFactory.getDefaultClient(argv[1], AsyncProtoFactory.getDefaultInfo().getSimSpecificSelector());
+ // AsyncProto proto=null;/*AsyncProtoFactory.getDefaultClient(argv[1], "/fart/poop");*/
+ MessageStack stack = null;
+ Thread t = new MeasPerfClient(stack);
t.start();
try {
Thread.sleep(RAMP_UP_TIME);
@@ -53,26 +55,26 @@
}
}
- public MeasPerfClient(AsyncProto proto) {
- System.out.println("FOUND A CLIENT PROTO:"+(proto!=null));
- channel = new TCPChannel(proto,this,false,this,true,"measure-perf-client");
+ public MeasPerfClient(MessageStack stack) {
+ System.out.println("FOUND A CLIENT PROTO:" + (stack != null));
+ channel = new TCPChannel(stack, this, false, this, true, "measure-perf-client");
}
public void run() {
while (true) {
- Payload p ;
+ Payload p;
Message m;
-
+
try {
- Long me=new Long(Thread.currentThread().getId());
+ Long me = new Long(Thread.currentThread().getId());
while (true) {
- p=new PayloadFactory.HeartbeatPayload();
- long id=r.nextLong();
- m=new Message(channel.getLocalLocator(),channel.getRemoteLocator(),id,p);
- timeMap.put(new Long(id),new Long(System.currentTimeMillis()));
- senderMap.put(new Long(id),me);
+ p = new PayloadFactory.HeartbeatPayload();
+ long id = r.nextLong();
+ m = new Message(channel.getLocalLocator(), channel.getRemoteLocator(), id, p);
+ timeMap.put(new Long(id), new Long(System.currentTimeMillis()));
+ senderMap.put(new Long(id), me);
channel.sendMessage(m);
- int ms=r.nextInt(MAX_SLEEP_TIME_MS);
+ int ms = r.nextInt(MAX_SLEEP_TIME_MS);
Thread.yield();
Thread.sleep(ms);
synchronized (deathList) {
@@ -82,7 +84,7 @@
}
}
//did somebody close this?
- if (channel==null) {
+ if (channel == null) {
return;
}
}
@@ -95,34 +97,34 @@
}
public void handleMessage(Message message, TCPChannel sourceChannel) throws NoSuchDestinationException {
- Long id=new Long(message.getSpaceID());
- long now=System.currentTimeMillis();
+ Long id = new Long(message.getSpaceID());
+ long now = System.currentTimeMillis();
if (!timeMap.containsKey(id)) {
- System.out.println("Can't find id in time map! Ignoring id:"+id);
+ System.out.println("Can't find id in time map! Ignoring id:" + id);
return;
}
- Long before=(Long)timeMap.get(id);
- long diff = now-before.longValue();
- double secs = ((double)diff)/1000.0;
+ Long before = (Long) timeMap.get(id);
+ long diff = now - before.longValue();
+ double secs = ((double) diff) / 1000.0;
synchronized (totalTimeLock) {
- totalTime+=secs;
- totalSamples+=1.0;
+ totalTime += secs;
+ totalSamples += 1.0;
}
- if (r.nextInt(100)==0) {
+ if (r.nextInt(100) == 0) {
double avg;
synchronized (totalTimeLock) {
- avg=totalTime/totalSamples;
+ avg = totalTime / totalSamples;
}
- System.out.println(""+ channel.getLocalLocator() +" Sample Turnaround time on heartbeat:"+secs+" and avg:"+avg);
- }
- if (secs>THRESHOLD_DANGER) {
- boolean cull=false;
-
+ System.out.println("" + channel.getLocalLocator() + " Sample Turnaround time on heartbeat:" + secs + " and avg:" + avg);
+ }
+ if (secs > THRESHOLD_DANGER) {
+ boolean cull = false;
+
synchronized (warningCounterLock) {
warningCounter++;
- if (warningCounter>THRESHOLD_TERMINATE) {
- cull=true;
- warningCounter=0;
+ if (warningCounter > THRESHOLD_TERMINATE) {
+ cull = true;
+ warningCounter = 0;
}
}
//not enough to convince us to blow up somebody?
@@ -130,10 +132,10 @@
return;
}
//ok, the sender of the message should die if it took too long...
- Long die = (Long)senderMap.get(id);
+ Long die = (Long) senderMap.get(id);
synchronized (deathList) {
if (!deathList.contains(die)) {
- System.out.println("Killing sender "+die+" because of too much time elapsed:"+secs);
+ System.out.println("Killing sender " + die + " because of too much time elapsed:" + secs);
deathList.add(die);
}
}
@@ -141,6 +143,6 @@
}
public void channelClosed(TCPChannel channel) {
- System.out.println("Got a message about closing channel:"+channel.hashCode()+ " vs. "+this.channel.hashCode());
+ System.out.println("Got a message about closing channel:" + channel.hashCode() + " vs. " + this.channel.hashCode());
}
}
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java 2008-03-11 17:41:45 UTC (rev 788)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/SenderQueue.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -14,26 +14,25 @@
package com.ogoglio.message;
+import com.ogoglio.message.plugin.MessageStack;
+import com.ogoglio.util.BlockingQueue;
+
import java.io.IOException;
-import com.ogoglio.message.proto.AsyncProto;
-import com.ogoglio.util.BlockingQueue;
-import com.ogoglio.util.Log;
-
public class SenderQueue {
private SenderThread senderThread = new SenderThread();
private BlockingQueue messageQueue = new BlockingQueue();
- private AsyncProto clientProto;
+ private MessageStack clientStack;
private boolean cleaned = false;
- public SenderQueue(AsyncProto clientProto, int maxSize) {
+ public SenderQueue(MessageStack clientStack, int maxSize) {
messageQueue.setMaxSize(maxSize);
- this.clientProto = clientProto;
+ this.clientStack = clientStack;
try {
- this.clientProto.prepareOutput();
+ this.clientStack.prepareOutput();
} catch (IOException e) {
throw new IllegalStateException("Could not get socket output stream: " + e);
}
@@ -46,7 +45,7 @@
public void cleanup() {
cleaned = true;
- clientProto.shutdown();
+ clientStack.shutdown();
if (messageQueue != null) {
messageQueue.close();
}
@@ -57,10 +56,11 @@
}
private void unsafeSendMessage(Message message) {
- String messageString = message.toString();
- Command command = new Command(Command.MESSAGE, messageString.length());
+// String messageString = message.toString();
+// Command command = new Command(Command.MESSAGE, messageString.length());
try {
- clientProto.sendMessage(command.toString(),messageString);
+// clientStack.sendMessage(command.toString(),messageString);
+ clientStack.sendMessage(message);
} catch (IOException e) {
if (!cleaned) {
throw new IllegalStateException("Error writing to client:" + message);
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2008-03-11 17:41:45 UTC (rev 788)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPChannel.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -14,15 +14,15 @@
package com.ogoglio.message;
-import com.ogoglio.message.proto.AsyncProto;
+import com.ogoglio.message.plugin.MessageStack;
import com.ogoglio.message.proto.Locator;
-import com.ogoglio.util.Log;
import com.ogoglio.util.BlockingQueue.QueueClosedException;
import com.ogoglio.util.BlockingQueue.QueueOverflowException;
+import com.ogoglio.util.Log;
public class TCPChannel implements TCPMessageReader.Listener {
- private AsyncProto clientProto = null;
+ private MessageStack clientStack = null;
// private String remoteHostName = null;
@@ -40,12 +40,12 @@
private boolean ensureOrigin = false;
- public TCPChannel(AsyncProto proto, MessageHandler message_handler, boolean ensureOrigin,
+ public TCPChannel(MessageStack stack, MessageHandler message_handler, boolean ensureOrigin,
Listener listener, boolean needAReaderThread, String debugInfo) {
- this.clientProto= proto;
- //remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
- //remoteHostPort = clientProto.getRemoteAddress().getPort();
+ this.clientStack = stack;
+ //remoteHostName = clientStack.getRemoteAddress().getAddress().getHostAddress();
+ //remoteHostPort = clientStack.getRemoteAddress().getPort();
if (message_handler == null) {
throw new IllegalArgumentException("bad message handler " + message_handler);
}
@@ -57,14 +57,14 @@
this.listener = listener;
// TODO Don't spand two threads for each socket! No effing way!
- senderQueue = new SenderQueue(clientProto, 1000); //TODO what should the max queue size be?
+ senderQueue = new SenderQueue(clientStack, 1000); //TODO what should the max queue size be?
senderQueue.start(debugInfo);
//this is a wee bit hairy. all clients need a reader thread. servers need a reader
//thread depending on their protocol.
if (needAReaderThread) {
- readerThread = new TCPMessageReader(clientProto, message_handler, this);
+ readerThread = new TCPMessageReader(clientStack, message_handler, this);
readerThread.setName("tcp-reader-"+debugInfo+"-"+readerThread.getId());
readerThread.start();
}
@@ -72,19 +72,19 @@
this.messageHandler=message_handler;
}
- public void clientReady(AsyncProto newlyConnectedProto) {
- Log.debug("Client connected from: "+newlyConnectedProto.getRemoteLocator());
+ public void clientReady(MessageStack newlyConnectedStack) {
+ Log.debug("Client connected from: "+newlyConnectedStack.getRemoteLocator());
}
public interface Listener {
public void channelClosed(TCPChannel channel);
}
public Locator getLocalLocator() {
- return clientProto.getLocalLocator();
+ return clientStack.getLocalLocator();
}
public Locator getRemoteLocator() {
- return clientProto.getRemoteLocator();
+ return clientStack.getRemoteLocator();
}
public void cleanup() {
@@ -108,26 +108,26 @@
public void sendMessage(Message message) throws NoSuchDestinationException {
if(message.getProxy() != null) {
- if (!message.getProxy().equals(clientProto.getRemoteLocator())) {
+ if (!message.getProxy().equals(clientStack.getRemoteLocator())) {
throw new NoSuchDestinationException("Passed a message to a TCPChannel with the wrong proxy: " + message.getProxy());
}
- } else if (!message.getDestination().equals(clientProto.getRemoteLocator())){
- throw new NoSuchDestinationException("Passed a message to a TCPChannel with the wrong destination: " + message.getDestination() + " but should be " + clientProto.getRemoteLocator());
+ } else if (!message.getDestination().equals(clientStack.getRemoteLocator())){
+ throw new NoSuchDestinationException("Passed a message to a TCPChannel with the wrong destination: " + message.getDestination() + " but should be " + clientStack.getRemoteLocator());
}
try {
senderQueue.sendMessage(message);
} catch (QueueOverflowException e) {
- Log.error("Queue overflow: " + clientProto.getRemoteLocator(),e);
+ Log.error("Queue overflow: " + clientStack.getRemoteLocator(),e);
cleanup();
} catch (QueueClosedException e) {
- Log.error("Queue closed: " + clientProto.getRemoteLocator(),e);
+ Log.error("Queue closed: " + clientStack.getRemoteLocator(),e);
cleanup();
}
}
public String toString() {
- return "TCPChannel from " + clientProto.getLocalLocator() + " to " +clientProto.getRemoteLocator();
+ return "TCPChannel from " + clientStack.getLocalLocator() + " to " + clientStack.getRemoteLocator();
}
public void socketClosed() {
Modified: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java 2008-03-11 17:41:45 UTC (rev 788)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/TCPMessageReader.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -14,12 +14,12 @@
package com.ogoglio.message;
-import java.io.IOException;
-
-import com.ogoglio.message.proto.AsyncProto;
+import com.ogoglio.message.plugin.MessageStack;
import com.ogoglio.message.proto.NegativeReadValueException;
import com.ogoglio.util.Log;
+import java.io.IOException;
+
public class TCPMessageReader extends Thread {
private boolean cleaned = false;
@@ -32,16 +32,16 @@
//private int remotePort = -1;
- private AsyncProto clientProto=null;
+ private MessageStack clientStack =null;
- public TCPMessageReader(AsyncProto clientProto, MessageHandler messageHandler, TCPChannel channel) {
+ public TCPMessageReader(MessageStack clientStack, MessageHandler messageHandler, TCPChannel channel) {
super("TCPMessageReader");
setDaemon(true);
- if (clientProto == null) {
- throw new IllegalArgumentException("bad protocol to TCPMessageReader" + clientProto);
+ if (clientStack == null) {
+ throw new IllegalArgumentException("bad protocol to TCPMessageReader" + clientStack);
}
- //remoteHostName = clientProto.getRemoteAddress().getAddress().getHostAddress();
- //remotePort = clientProto.getRemoteAddress().getPort();
+ //remoteHostName = clientStack.getRemoteAddress().getAddress().getHostAddress();
+ //remotePort = clientStack.getRemoteAddress().getPort();
if (messageHandler == null) {
throw new IllegalArgumentException("bad message handler: " + messageHandler);
}
@@ -50,10 +50,10 @@
throw new IllegalArgumentException("bad listener " + channel);
}
this.channel = channel;
- this.clientProto = clientProto;
+ this.clientStack = clientStack;
try {
- this.clientProto.prepareInput();
+ this.clientStack.prepareInput();
} catch (IOException e) {
throw new IllegalStateException("Couldn't get client socket input stream " + e);
}
@@ -67,28 +67,27 @@
public void cleanup() {
cleaned = true;
- clientProto.shutdown();
+ clientStack.shutdown();
}
public void run() {
try {
while (!cleaned) {
- String msg=clientProto.readLine();
- if (msg!=null) {
- //Log.info("TCP Message Reader:"+msg);
- Message message = Message.parseMessage(msg);
- if (channel.ensureOrigin()) {
- message.setOrigin(clientProto.getRemoteLocator());
- //message.getOrigin().setHost(remoteHostName);
- //message.getOrigin().setPort(remotePort);
- }
- try {
- messageHandler.handleMessage(message, channel);
- } catch (Throwable e) {
- Log.error("Error handling our message",e);
- e.printStackTrace();
- }
+// String msg=clientStack.readLine();
+ //Log.info("TCP Message Reader:"+msg);
+// Message message = Message.parseMessage(msg);
+ Message message = clientStack.readMessage();
+ if (channel.ensureOrigin()) {
+ message.setOrigin(clientStack.getRemoteLocator());
+ //message.getOrigin().setHost(remoteHostName);
+ //message.getOrigin().setPort(remotePort);
}
+ try {
+ messageHandler.handleMessage(message, channel);
+ } catch (Throwable e) {
+ Log.error("Error handling our message",e);
+ e.printStackTrace();
+ }
}
} catch (NegativeReadValueException e) {
//Log.info("Negative value from read, assuming server closed connection!");
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/ChunkedStreamNetworkTransporter.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/ChunkedStreamNetworkTransporter.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/ChunkedStreamNetworkTransporter.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,132 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+import com.ogoglio.message.proto.NegativeReadValueException;
+import com.ogoglio.util.Log;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * A NetworkTransporter plugin class that uses arbitrary input/output streams to read and write
+ * network traffic. HTTP chunking information is written and parsed in order to communicate message
+ * lengths.
+ *
+ * @author Matt Kimmel
+ */
+public class ChunkedStreamNetworkTransporter extends StreamNetworkTransporter {
+ /**
+ * Construct an instance of ChunkedStreamNetworkTransporter using the given streams.
+ *
+ * @param input Stream to read incoming network traffic from.
+ * @param output Stream to write outgoing network traffic to.
+ */
+ public ChunkedStreamNetworkTransporter(InputStream input, OutputStream output) {
+ super(input, output);
+ }
+
+ public void sendNetworkMessage(byte[] message) throws MessagingException, IOException {
+ // Throw exception on null data
+ if (message == null) {
+ throw new MessagingException("null data passed to sendNetworkMessage");
+ }
+
+ // Encode the message length into a message header.
+ byte[] messageHeader = encodeMessageLength(message.length);
+
+ // Get length of data chunk and encode it.
+ // Note that we add the size of the message header.
+ int dataLength = message.length + messageHeader.length;
+ String lengthHexStr = Integer.toHexString(dataLength);
+
+ // Now that we know the length of the data length string, allocate the buffer.
+ // The size is the length of the hex data plus the length of the buffer plus
+ // 4 bytes for the two \r\n pairs plus the size of the message header (which is
+ // redundant but necessary when sending data to a Comet server).
+ int bufferLength = lengthHexStr.length() + message.length + messageHeader.length + 4;
+ byte[] dataBuffer = new byte[bufferLength];
+
+ // Copy in the length string
+ int offset = 0;
+ byte[] lengthHexBytes = lengthHexStr.getBytes();
+ System.arraycopy(lengthHexBytes, 0, dataBuffer, offset, lengthHexBytes.length);
+ offset += lengthHexBytes.length;
+
+ // Insert the first \r\n
+ dataBuffer[offset] = '\r';
+ dataBuffer[offset + 1] = '\n';
+ offset += 2;
+
+ // Write in the message size
+ System.arraycopy(messageHeader, 0, dataBuffer, offset, messageHeader.length);
+ offset += messageHeader.length;
+
+ // Copy in the data
+ System.arraycopy(message, 0, dataBuffer, offset, message.length);
+ offset += message.length;
+
+ // Add the final \r\n
+ dataBuffer[offset] = '\r';
+ dataBuffer[offset + 1] = '\n';
+
+ // Send the message
+ outputStream.write(dataBuffer);
+ outputStream.flush();
+ }
+
+ public byte[] receiveNetworkMessage() throws MessagingException, IOException {
+ // Read the length of the chunk, blocking if necessary.
+ StringBuilder messageLengthBuffer = new StringBuilder();
+ int b = inputStream.read();
+ while ((b >= 0) && ((char) b != '\r')) {
+ messageLengthBuffer.append((char) b);
+ b = inputStream.read();
+ }
+ if (b < 0) {
+ Log.info("Negative number returned by InputStream.read in receiveNetworkMessage--possible disconnection");
+ throw new NegativeReadValueException("Got a negative number from InputStream.read in ChunkedStreamNetworkTransporter");
+ }
+ b = inputStream.read(); // Eat the \n
+
+ // Parse the chunked message length
+ int overallMessageLength = Integer.parseInt(messageLengthBuffer.toString(), 16);
+
+ // Read in the message length header to get the true message length.
+ int messageLength = decodeMessageLength();
+
+ // Now allocate a buffer of the proper size and read the data into it,
+ // blocking if necessary.
+ byte[] buffer = new byte[messageLength];
+ if (messageLength > 0) {
+ int bytesRead = inputStream.read(buffer);
+ if (bytesRead != messageLength) {
+ throw new MessagingException("Amount of data read does not match message length");
+ }
+ }
+
+ // Eat the closing \r\n
+ b = inputStream.read();
+ b = inputStream.read();
+
+ // If the message length is 0, we've been disconnected. Throw a
+ // NegativeReadValueException.
+ if (messageLength == 0) {
+ throw new NegativeReadValueException("EOC received in ChunkedStreamNetworkTransporter");
+ }
+
+ return buffer;
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageEncoder.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageEncoder.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageEncoder.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,44 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+import com.ogoglio.message.Message;
+
+/**
+ * This interface must be implemented by any class that wishes to act as a plug-in at the message encoding
+ * layer of the messaging stack. Its implementors are responsible for encoding and decoding Message objects,
+ * including their Payloads and other information.
+ *
+ * @author Matt Kimmel
+ */
+public interface MessageEncoder {
+ /**
+ * Encodes the given Message object into a byte array suitable to be passed to the WireFormatEncoder,
+ * and suitable to be decoded by the decodeMessage method.
+ *
+ * @param message Message to encode.
+ * @return Byte array containing the encoded representation of the message.
+ * @throws MessagingException if the message cannot be encoded.
+ */
+ public byte[] encodeMessage(Message message) throws MessagingException;
+
+ /**
+ * Decode a message as encoded by encodeMessage into a Message object.
+ *
+ * @param messageData Byte array representation of message, as produced by encodeMessage.
+ * @return Decoded Message object.
+ * @throws MessagingException if the message cannot be decoded.
+ */
+ public Message decodeMessage(byte[] messageData) throws MessagingException;
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStack.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStack.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStack.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,242 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+import com.ogoglio.message.Message;
+import com.ogoglio.message.proto.Locator;
+
+import java.io.IOException;
+
+/**
+ * This class encapsulates an entire pluggable message stack--a MessageEncoder, a WireFormatEncoder,
+ * and a NetworkTransporter. It provides an interface for sending and receiving Message objects using
+ * the stack, as well as an interface for sending and receiving raw data (at the WireFormatEncoder
+ * level).
+ *
+ * MessageStack is beginning its lifespan as a more-or-less drop-in replacement for AsyncProto.
+ * The two interfaces will probably diverge over time.
+ *
+ * @author Matt Kimmel
+ */
+public class MessageStack {
+ /**
+ * Local locator.
+ */
+ protected Locator localLocator;
+
+ /**
+ * Remote locator.
+ */
+ protected Locator remoteLocator;
+
+ /**
+ * MessageEncoder plugin for this stack.
+ */
+ protected MessageEncoder messageEncoder;
+
+ /**
+ * WireFormatEncoder plugin for this stack.
+ */
+ protected WireFormatEncoder wireFormatEncoder;
+
+ /**
+ * NetworkTransporter plugin for this stack.
+ */
+ protected NetworkTransporter networkTransporter;
+
+ /**
+ * Construct a new MessageStack object from the given parameters.
+ *
+ * @param messageEncoder MessageEncoder object to use in this stack.
+ * @param wireFormatEncoder WireFormatEncoder object to use in this stack.
+ * @param networkTransporter NetworkTransporter object to use in this stack.
+ * @param localLocator Locator for localLocator side of connection.
+ * @param remoteLocator Locator for remoteLocator side of connection.
+ */
+ MessageStack(MessageEncoder messageEncoder, WireFormatEncoder wireFormatEncoder, NetworkTransporter networkTransporter, Locator localLocator, Locator remoteLocator) {
+ this.messageEncoder = messageEncoder;
+ this.wireFormatEncoder = wireFormatEncoder;
+ this.networkTransporter = networkTransporter;
+ this.localLocator = localLocator;
+ this.remoteLocator = remoteLocator;
+ }
+
+ /**
+ * Return the local Locator for this connection. This is a holdover from
+ * AsyncProto; it may be deprecated eventually, or Locater may be decoupled
+ * from URIs.
+ *
+ * @return Local Locator for this connection.
+ */
+ public Locator getLocalLocator() {
+ return localLocator;
+ }
+
+ /**
+ * Return the remote Locator for this connection. This is a holdover from
+ * AsyncProto; it may be deprecated eventually, or Locater may be decoupled
+ * from URIs.
+ *
+ * @return Remote Locator for this connection.
+ */
+ public Locator getRemoteLocator() {
+ return remoteLocator;
+ }
+
+ /**
+ * Send a message to the network, through the stack. The message will be
+ * encoded, the result will be encoded for the wire, and the final message
+ * will be sent using the NetworkTransporter.
+ *
+ * @param message The Message object to send
+ * @throws IOException on a low-level I/O error (typically from the underlying network)
+ */
+ public void sendMessage(Message message) throws IOException {
+ try {
+ // First encode the message
+ byte[] encodedMessage = messageEncoder.encodeMessage(message);
+
+ // Now encode it for the wire
+ byte[] wireEncodedMessage = wireFormatEncoder.encodeForWire(encodedMessage);
+
+ // Finally, send it to the network
+ networkTransporter.sendNetworkMessage(wireEncodedMessage);
+ } catch (MessagingException e) {
+ // TODO: Widen interface to throw this directly
+ throw new IOException("MessagingException encountered in sendMessage: " + e.toString());
+ }
+ }
+
+ /**
+ * Receive a single message from the network, through the stack. The message
+ * will be read from the underlying network, any needed wire-format decoding will
+ * be done, and the message will be decoded from its original encoding and returned.
+ * This method will block until a message is received.
+ *
+ * @return Decoded message. The method will throw an exception rather than return a null Message.
+ * @throws IOException on a low-level I/O error (typically from the underlying network)
+ */
+ public Message readMessage() throws IOException {
+ try {
+ // Read raw data from the network, blocking if necessary
+ byte[] networkData = networkTransporter.receiveNetworkMessage();
+
+ // Decode any wire format from the data
+ byte[] encodedMessage = wireFormatEncoder.decodeFromWire(networkData);
+
+ // Decode the message
+ return messageEncoder.decodeMessage(encodedMessage);
+ } catch (MessagingException e) {
+ // TODO: Widen interface to throw this directly
+ throw new IOException("MessagingException encountered in readMessage: " + e.toString());
+ }
+ }
+
+ /**
+ * Send a series of bytes, unencoded, bypassing the MessageEncoder. This is used
+ * for real-time protocols that don't want the overhead of representing their
+ * messages as objects.
+ *
+ * @param message Bytes to send
+ * @throws IOException on a low-level I/O error (typically from the underlying network)
+ */
+ public void sendUnencodedMessage(byte[] message) throws IOException {
+ try {
+ // Encode the message for the wire
+ byte[] wireEncodedMessage = wireFormatEncoder.encodeForWire(message);
+
+ // Now send it to the network
+ networkTransporter.sendNetworkMessage(wireEncodedMessage);
+ } catch (MessagingException e) {
+ // TODO: Widen interface to throw this directly
+ throw new IOException("MessagingException encountered in sendUnencodedMessage: " + e.toString());
+ }
+ }
+
+ /**
+ * Receive a series of bytes, unencoded, as sent by sendUnencodedMessage. This is used
+ * for real-time protocols that don't want the overhead of representing their
+ * messages as objects.
+ *
+ * @return Unencoded byte array. This method will throw an exception rather than return null.
+ * @throws IOException on a low-level I/O error (typically from the underlying network)
+ */
+ public byte[] readUnencodedMessage() throws IOException {
+ try {
+ // Read raw data from the network, blocking if necessary
+ byte[] networkData = networkTransporter.receiveNetworkMessage();
+
+ // Decode any wire format from the data and return it
+ return wireFormatEncoder.decodeFromWire(networkData);
+ } catch (MessagingException e) {
+ // TODO: Widen interface to throw this directly
+ throw new IOException("MessagingException encountered in readUnencodedMessage: " + e.toString());
+ }
+ }
+
+ /**
+ * Shut down the Message Stack. This will disconnect the NetworkTransporter, if possible.
+ */
+ public void shutdown() {
+ try {
+ networkTransporter.closeNetworkConnection();
+ } catch (IOException e) {
+ // Do nothing with this for now
+ }
+ }
+
+ /**
+ * Attempt to set a new output object for this message stack, assuming the NetworkTransporter
+ * supports it and the object is of a type understood by the NetworkTransporter.
+ *
+ * @param outputObject New output object.
+ * @throws MessagingException if the NetworkTransporter does not support this operation or does not understand the object
+ */
+ public void setOutputObject(Object outputObject) throws MessagingException {
+ // Really just delegated to the NetworkTransporter
+ networkTransporter.setOutputObject(outputObject);
+ }
+
+ /**
+ * Attempt to set a new input object for this message stack, assuming the NetworkTransporter
+ * supports it and the object is of a type understood by the NetworkTransporter.
+ *
+ * @param inputObject New input object.
+ * @throws MessagingException if the NetworkTransporter does not support this operation or does not understand the object
+ */
+ public void setInputObject(Object inputObject) throws MessagingException {
+ // Really just delegated to the NetworkTransporter
+ networkTransporter.setInputObject(inputObject);
+ }
+
+ /**
+ * Check whether data is ready to be read without blocking. Note that this
+ * does not mean that there is an entire message ready to be read without
+ * blocking.
+ *
+ * @return true if data is available; false if not
+ * @throws IOException if the NetworkTransporter does not support this operation or if a low-level I/O error occurs
+ */
+ public boolean isDataReady() throws IOException {
+ return networkTransporter.isDataReady();
+ }
+
+ public void prepareOutput() throws IOException {
+ // Does nothing in this class
+ }
+
+ public void prepareInput() throws IOException {
+ // Does nothing in this class
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackFactory.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackFactory.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackFactory.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,153 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+import com.ogoglio.client.WebAPIDescriptor;
+import com.ogoglio.message.proto.CometClient;
+import com.ogoglio.message.proto.Locator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+
+/**
+ * This is a factory class that produces MessageStacks based on MessageStackTypes and
+ * arbitrary connection objects. At the moment it's a fairly simplistic mechanism for
+ * configuring MessageStacks, but it should evolve over time into a sophisticated and
+ * configurable paradigm.
+ *
+ * The USE_SIMPLE_SOCKET variable and associated methods have been moved here from
+ * AsyncProtoFactory. The default port definitions have been moved here from SimpleSocketAsync.
+ *
+ * @author Matt Kimmel
+ */
+public class MessageStackFactory {
+ /**
+ * This variable determines whether Comet or Simple Socket protocols will be used.
+ */
+ private static final boolean USE_SIMPLE_SOCKET = false;
+
+ /**
+ * Default port to use for message proxy
+ */
+ public static final int DEFAULT_PROXY_PORT = 49355;
+
+ /**
+ * Default port to use for sim
+ */
+ public static final int DEFAULT_SIM_PORT = 8922;
+
+ /**
+ * Create a message stack configured with plugins appropriate to the provided MessageStackType.
+ * Use the arbitrary inputObject and outputObject parameters (which should actually be instances
+ * of objects appropriate to the MessageStack--for example, an InputStream and OutputStream for
+ * a stream-based stack) and the Locators in construction of the stack.
+ *
+ * @param type Enumerated type of MessageStack to build
+ * @param inputObject MessageStack-type appropriate object to use for network input
+ * @param outputObject MessageStack-type appropriate object to use for network output
+ * @param localLocator Locator describing local side of connection
+ * @param remoteLocator Locator describing remote side of connection
+ * @return A fully configured MessageStack object
+ * @throws MessagingException if the MessageStack could not be created
+ */
+ static public MessageStack createMessageStack(MessageStackType type, Object inputObject, Object outputObject, Locator localLocator, Locator remoteLocator) throws MessagingException {
+ MessageEncoder messageEncoder;
+ WireFormatEncoder wireFormatEncoder;
+ NetworkTransporter networkTransporter;
+
+ if (type == MessageStackType.DEFAULT_COMET_CLIENT) {
+ if (!(inputObject instanceof InputStream) || !(outputObject instanceof OutputStream)) {
+ throw new MessagingException("Invalid input/output objects in createMessageStack");
+ }
+
+ messageEncoder = new XMLMessageEncoder();
+ wireFormatEncoder = new NullWireFormatEncoder();
+ networkTransporter = new ChunkedStreamNetworkTransporter((InputStream) inputObject, (OutputStream) outputObject);
+ } else if (type == MessageStackType.DEFAULT_COMET_SERVER) {
+ if (((inputObject != null) && !(inputObject instanceof InputStream)) || ((outputObject != null) && !(outputObject instanceof OutputStream))) {
+ throw new MessagingException("Invalid input/output objects in createMessageStack");
+ }
+
+ messageEncoder = new XMLMessageEncoder();
+ wireFormatEncoder = new NullWireFormatEncoder();
+ networkTransporter = new StreamNetworkTransporter((InputStream) inputObject, (OutputStream) outputObject);
+ } else if (type == MessageStackType.DEFAULT_SIMPLESOCKET) {
+ if (!(inputObject instanceof InputStream) || !(outputObject instanceof OutputStream)) {
+ throw new MessagingException("Invalid input/output objects in createMessageStack");
+ }
+
+ messageEncoder = new XMLMessageEncoder();
+ wireFormatEncoder = new NullWireFormatEncoder();
+ networkTransporter = new StreamNetworkTransporter((InputStream) inputObject, (OutputStream) outputObject);
+ } else {
+ throw new MessagingException("Unknown Message Stack Type passed to createMessageStack");
+ }
+
+ return new MessageStack(messageEncoder, wireFormatEncoder, networkTransporter, localLocator, remoteLocator);
+ }
+
+ static public MessageStack getDefaultStack(WebAPIDescriptor descriptor, boolean wantProxy) throws IOException {
+ if (USE_SIMPLE_SOCKET) {
+ if (wantProxy) {
+ return createSimpleSocketStack(descriptor.getServiceStateURI().getHost(), DEFAULT_PROXY_PORT);
+ } else {
+ return createSimpleSocketStack(descriptor.getServiceStateURI().getHost(), DEFAULT_SIM_PORT);
+ }
+ } else {
+ if (wantProxy) {
+ return CometClient.getStack(descriptor.getCometProxyURI());
+ } else {
+ return CometClient.getStack(descriptor.getCometSimURI());
+ }
+ }
+ }
+
+ static public MessageStack getDefaultStack(URI uri) throws IOException {
+ if (USE_SIMPLE_SOCKET) {
+ return createSimpleSocketStack(uri.getHost(), uri.getPort());
+ } else {
+ return CometClient.getStack(uri);
+ }
+ }
+
+ static public String getScheme() {
+ if (USE_SIMPLE_SOCKET) {
+ return "og";
+ } else {
+ return "comet";
+ }
+ }
+
+ private static MessageStack createSimpleSocketStack(String host, int port) throws IOException {
+ Socket simpleSocket = new Socket(host, port);
+ InetSocketAddress addr = (InetSocketAddress) simpleSocket.getLocalSocketAddress();
+ Locator localLocator = sockAddrToScheme(addr);
+ addr = (InetSocketAddress) simpleSocket.getRemoteSocketAddress();
+ Locator remoteLocator = sockAddrToScheme(addr);
+ try {
+ return createMessageStack(MessageStackType.DEFAULT_SIMPLESOCKET, simpleSocket.getInputStream(), simpleSocket.getOutputStream(), localLocator, remoteLocator);
+ } catch (MessagingException e) {
+ throw new IOException("Caught MessagingException while constructing stack: " + e.toString());
+ }
+ }
+
+ static public Locator sockAddrToScheme(InetSocketAddress addr) {
+ String scheme = getScheme();
+ return new Locator(scheme + "://" + addr.getHostName() + ":" + addr.getPort());
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackType.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackType.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessageStackType.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,91 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+/**
+ * This is a Java 1.4-style enum class, enumerating the configurations of MessageStack
+ * that MessageStackFactory will build.
+ *
+ * @author Matt Kimmel
+ */
+public final class MessageStackType {
+ /**
+ * Default Ogoglio client-side stack (XML/Null/Chunked)
+ */
+ static public final MessageStackType DEFAULT_COMET_CLIENT = new MessageStackType("DEFAULT_COMET_CLIENT", 0);
+
+ /**
+ * Default Ogoglio server-side stack (XML/Null/Comet)
+ */
+ static public final MessageStackType DEFAULT_COMET_SERVER = new MessageStackType("DEFAULT_COMET_SERVER", 1);
+
+ /**
+ * Default Ogoglio "simple socket" stack (XML/Null/SizedData)
+ */
+ static public final MessageStackType DEFAULT_SIMPLESOCKET = new MessageStackType("DEFAULT_SIMPLESOCKET", 2);
+
+ /**
+ * Array listing all possible MessageStackTypes (used by valueOf methods below).
+ */
+ static public MessageStackType[] values = {DEFAULT_COMET_CLIENT, DEFAULT_COMET_SERVER, DEFAULT_SIMPLESOCKET};
+
+ /**
+ * Name of this MessageStackType--should be the same as its identifier.
+ */
+ private final String name;
+
+ /**
+ * Ordinal value of this MessageStackType--must be unique.
+ */
+ private final int ordinal;
+
+ /**
+ * Construct a MessageStackType from a name and ordinal, both of which must be unique.
+ *
+ * @param name Name of MessageStackType--should be the same as its identifier.
+ * @param ordinal Ordinal value of MessageStackType--must be unique.
+ */
+ private MessageStackType(String name, int ordinal) {
+ this.name = name;
+ this.ordinal = ordinal;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public int getOrdinal() {
+ return ordinal;
+ }
+
+ static public MessageStackType valueOf(String name) {
+ for (int i = 0; i < values.length; ++i) {
+ if (name.equals(values[i].toString())) {
+ return values[i];
+ }
+ }
+
+ return null;
+ }
+
+ static public MessageStackType valueOf(int ordinal) {
+ for (int i = 0; i < values.length; ++i) {
+ if (ordinal == values[i].getOrdinal()) {
+ return values[i];
+ }
+ }
+
+ return null;
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessagingException.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessagingException.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/MessagingException.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,37 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+/**
+ * Base exception thrown by various methods in message plugin interfaces when encoding/decoding fails.
+ *
+ * @author Matt Kimmel
+ */
+public class MessagingException extends Exception {
+ public MessagingException() {
+ super();
+ }
+
+ public MessagingException(String message) {
+ super(message);
+ }
+
+ public MessagingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MessagingException(Throwable cause) {
+ super(cause);
+ }
+}
Added: maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/NetworkTransporter.java
===================================================================
--- maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/NetworkTransporter.java (rev 0)
+++ maven/trunk/ogoglio-common/src/main/java/com/ogoglio/message/plugin/NetworkTransporter.java 2008-03-11 23:18:53 UTC (rev 789)
@@ -0,0 +1,84 @@
+/* Copyright 2008, The Electric Sheep Company, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. */
+package com.ogoglio.message.plugin;
+
+import java.io.IOException;
+
+/**
+ * This interface must be implemented by any class that wishes to act as a plug-in at the Network Trans...
[truncated message content] |