You can subscribe to this list here.
| 2006 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(123) |
Dec
(100) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2007 |
Jan
(12) |
Feb
(80) |
Mar
(35) |
Apr
|
May
|
Jun
(28) |
Jul
(10) |
Aug
(6) |
Sep
|
Oct
|
Nov
(16) |
Dec
|
| 2008 |
Jan
|
Feb
|
Mar
(8) |
Apr
|
May
|
Jun
|
Jul
|
Aug
(1) |
Sep
|
Oct
|
Nov
(1) |
Dec
(6) |
| 2009 |
Jan
(20) |
Feb
(1) |
Mar
(19) |
Apr
(12) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(3) |
Dec
|
|
From: Peter P. <pr...@us...> - 2007-07-20 13:22:21
|
Update of /cvsroot/pyxida/Pyxida/lib In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv27511/lib Modified Files: asyncj.jar Log Message: Updated handling of config parameters Index: asyncj.jar =================================================================== RCS file: /cvsroot/pyxida/Pyxida/lib/asyncj.jar,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 Binary files /tmp/cvsGVivwZ and /tmp/cvsHHrccI differ |
|
From: Peter P. <pr...@us...> - 2007-07-20 13:22:21
|
Update of /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv27511/src/edu/harvard/syrah/pyxida/nc Modified Files: NCManager.java Log Message: Updated handling of config parameters Index: NCManager.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc/NCManager.java,v retrieving revision 1.38 retrieving revision 1.39 diff -C2 -d -r1.38 -r1.39 *** NCManager.java 30 Mar 2007 18:03:27 -0000 1.38 --- NCManager.java 20 Jul 2007 13:22:18 -0000 1.39 *************** *** 39,43 **** // This allows the size on the wire to remain constant // which is important for Azureus integration. ! String bootstrapList[] = Config.getProperty( "bootstraplist", "righthand.eecs.harvard.edu lefthand.eecs.harvard.edu").split("[\\s]"); --- 39,43 ---- // This allows the size on the wire to remain constant // which is important for Azureus integration. ! String bootstrapList[] = Config.getProperty( "bootstraplist", "righthand.eecs.harvard.edu lefthand.eecs.harvard.edu").split("[\\s]"); *************** *** 712,716 **** neighbor = getUpNeighbor(); } ! assert (neighbor != null); return neighbor; } --- 712,716 ---- neighbor = getUpNeighbor(); } ! //assert (neighbor != null) : "Could not find a bootstrap neighbour"; return neighbor; } |
|
From: Peter P. <pr...@us...> - 2007-07-20 13:22:18
|
Update of /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv27511/src/edu/harvard/syrah/pyxida Modified Files: Pyxida.java Log Message: Updated handling of config parameters Index: Pyxida.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/Pyxida.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** Pyxida.java 30 Mar 2007 18:03:27 -0000 1.24 --- Pyxida.java 20 Jul 2007 13:22:18 -0000 1.25 *************** *** 27,36 **** * All config properties in the file must start with 'pyxida.' */ ! Config.read("pyxida", System.getProperty("config", "config/pyxida.cfg")); } - private static final String CONFIG_FILE = System.getProperty("config", - "config/pyxida.cfg"); - // Imperial blocks ports outside of 55000-56999 public static final int COMM_PORT = Integer.parseInt(Config.getConfigProps().getProperty("port", "55504")); --- 27,33 ---- * All config properties in the file must start with 'pyxida.' */ ! Config.read("pyxida", System.getProperty("pyxida.config", "config/pyxida.cfg")); } // Imperial blocks ports outside of 55000-56999 public static final int COMM_PORT = Integer.parseInt(Config.getConfigProps().getProperty("port", "55504")); *************** *** 50,55 **** public static Random random; ! private Pyxida() { ! } private void init(final CB0 cbDone) { --- 47,51 ---- public static Random random; ! private Pyxida() { /* empty */ } private void init(final CB0 cbDone) { |
|
From: Peter P. <pr...@us...> - 2007-06-15 16:13:24
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv20481/src/edu/harvard/syrah/sbon/async Modified Files: Sync.java Log Message: Sync now returns Errors as Results Index: Sync.java =================================================================== RCS file: /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/Sync.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** Sync.java 4 Jun 2007 17:55:41 -0000 1.1 --- Sync.java 15 Jun 2007 16:13:23 -0000 1.2 *************** *** 26,39 **** super.run(); ! cbBlocking.callOK(); EL.get().registerTimerCB(new CB0() { ! protected void cb(CBResult result) { ! cbDone.callOK(); ! } ! }); } }; blockingThread.run(); } --- 26,45 ---- super.run(); ! try { ! cbBlocking.callOK(); ! } catch (final Error e) { ! // Return any exceptions ! EL.get().registerTimerCB(new CB0() { ! protected void cb(CBResult result) { cbDone.call(CBResult.ERROR(new Exception(e))); } ! }); ! return; ! } EL.get().registerTimerCB(new CB0() { ! protected void cb(CBResult result) { cbDone.call(result); } }); } }; + blockingThread.run(); } |
|
From: Peter P. <pr...@us...> - 2007-06-15 16:13:24
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/http In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv20481/src/edu/harvard/syrah/sbon/async/comm/http Modified Files: HTTPComm.java Log Message: Sync now returns Errors as Results Index: HTTPComm.java =================================================================== RCS file: /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/http/HTTPComm.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** HTTPComm.java 4 Jun 2007 17:55:41 -0000 1.1 --- HTTPComm.java 15 Jun 2007 16:13:23 -0000 1.2 *************** *** 972,975 **** --- 972,981 ---- EL.get().registerTimerCB(new CB0() { protected void cb(CBResult resultOK) { + + /* Remove trailing slash */ + + if (uri.endsWith("/")) + uri = uri.substring(0, uri.length() - 1); + final HTTPCallbackHandler httpRequestHandler = getRequestHandler(uri); |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:41
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/xmlrpc In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async/comm/xmlrpc Added Files: XMLRPCCallbackHandlerIF.java XMLRPCCB.java XMLRPCCallbackInvoker.java ExampleRPCHandler.java XMLRPCComm.java XMLRPCCommIF.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: XMLRPCComm.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 9, 2005 */ package edu.harvard.syrah.sbon.async.comm.xmlrpc; import java.io.*; import java.net.MalformedURLException; import java.net.UnknownHostException; import java.util.*; import org.apache.xmlrpc.*; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.NetUtil; import edu.harvard.syrah.sbon.async.*; import edu.harvard.syrah.sbon.async.CallbacksIF.*; import edu.harvard.syrah.sbon.async.comm.AddressFactory; import edu.harvard.syrah.sbon.async.comm.AddressIF; import edu.harvard.syrah.sbon.async.comm.http.*; /** * * Implementation of the XMLRPC Communication module * * TODO - add proper connection management with keepAlives (!!!) - test code * that handles queued requests * */ public class XMLRPCComm extends HTTPComm implements XMLRPCCommIF { protected static final Log log = new Log(XMLRPCComm.class); private static final String HTTP_ENCODING = "US-ASCII"; private static final String XMLRPC_PATH = "/xmlrpc"; protected Map<String, Object> xmlRPCHandler = new HashMap<String, Object>(); protected XMLRPCServers xmlRPCServers = new XMLRPCServers(); // Constructor is protected protected XmlRpcRequestProcessor requestProcessor = new XmlRpcRequestProcessor() { /* empty */ }; protected XmlRpcResponseProcessor responseProcessor = new XmlRpcResponseProcessor() { /* empty */ }; public XMLRPCComm() { super(); } public void initServer(AddressIF bindAddress, CB0 cbInit) { log.main("Binding to address=" + bindAddress + " with localAddress=" + localNetAddress); super.initServer(bindAddress, XmlRpc.getKeepAlive(), cbInit); } /* * @see edu.harvard.syrah.sbon.comm.XMLRPCCommIF#call(java.lang.String, * java.lang.Object[]) */ public void call(String urlString, String methodName, Object... args) throws MalformedURLException, UnknownHostException { call(urlString, methodName, null, args); } /* * @see edu.harvard.syrah.sbon.comm.XMLRPCCommIF#call(java.lang.String, * java.util.Vector, edu.harvard.syrah.sbon.comm.XMLRPCCB) */ public void call(String urlString, String methodName, final XMLRPCCB cb, Object... args) throws MalformedURLException, UnknownHostException { log.debug("Calling XMLRPC method " + methodName + " with args=" + args + " and url=" + urlString); XmlRpcRequest XmlRpcRequest = new XmlRpcRequest(methodName, new Vector<Object>( Arrays.asList(args))); XmlRpcClientRequestProcessor xmlRpcClientRequestProcess = new XmlRpcClientRequestProcessor(); byte[] request = null; try { request = xmlRpcClientRequestProcess.encodeRequestBytes(XmlRpcRequest, HTTP_ENCODING); } catch (XmlRpcClientException e) { log.error("Could not encode client request:" + e); } sendHTTPRequest(urlString, new String(request), XmlRpc.getKeepAlive(), new HTTPCB() { protected void cb(CBResult result, Integer resultCode, String httpResponse, String httpData) { switch (result.state) { case OK: { log.debug("Invoking XmlRpcWorker"); // XmlRpc.debug = true; XmlRpcClientResponseProcessor xmlRpcClientResponseProcessor = new XmlRpcClientResponseProcessor(); Object resultObject = null; InputStream is = new ByteArrayInputStream(NetUtil.toHTTPBytes(httpData)); try { resultObject = xmlRpcClientResponseProcessor.decodeResponse(is); } catch (XmlRpcClientException e) { log.error("Could not decode result: " + e); } log.debug("Got a resultObject."); // do the callback if (!(resultObject instanceof XmlRpcException)) { cb.call(CBResult.OK(), resultObject); } else { cb.call(CBResult.ERROR("XmlRpcException"), resultObject); } break; } case TIMEOUT: case ERROR: { log.debug("Error calling xmlrpc method"); cb.call(result, null); break; } } } }); } /* * @see edu.harvard.syrah.sbon.comm.XMLRPCCommIF#registerHandler(java.lang.String, * java.lang.Object) */ public void registerXMLRPCHandler(String handlerName, Object handler) { xmlRPCHandler.put(handlerName, new XMLRPCCallbackInvoker(handler)); registerHandler(XMLRPC_PATH, new HTTPCallbackHandler() { protected void cb(CBResult result, AddressIF remoteAddr, String path, String httpRequest, final CB2<String, byte[]> cbHTTPResponse) { log.debug("Invoking XmlRpcWorker"); // log.debug("buffer.remaining=" + buffer.remaining()); // log.debug("buffer=" + Util.printChar(buffer)); // XmlRpc.debug = true; InputStream is = new ByteArrayInputStream(NetUtil.toHTTPBytes(httpRequest)); XmlRpcServerRequest request = requestProcessor.decodeRequest(is); if (request == null) { byte[] error = responseProcessor.encodeException(new Exception( "No XMLRPC method invocation specified"), requestProcessor.getEncoding()); cbHTTPResponse.call(CBResult.OK(), "text/xml", error); return; } Object handler = null; try { handler = xmlRPCServers.getHandler(request.getMethodName()); } catch (Exception e1) { String errorString = "Could not get object handler for " + request.getMethodName() + ". e=" + e1; byte[] error = responseProcessor.encodeException(new Exception( errorString), requestProcessor.getEncoding()); cbHTTPResponse.call(CBResult.OK(), "text/xml", error); return; } if (!(handler instanceof XMLRPCCallbackHandlerIF)) log.error("Can only invoke XMLRPCCallbackHandlers."); XMLRPCCallbackHandlerIF xmlRPCCallbackHandler = (XMLRPCCallbackHandlerIF) handler; log.debug("Calling xmlrpchandler..."); try { xmlRPCCallbackHandler.execute(request.getMethodName(), request.getParameters(), new CB1<Object>() { public void cb(CBResult result, Object responseObject) { switch (result.state) { case OK: { log.debug("Received a response from server. responseObject=" + responseObject); byte[] response = null; try { response = responseProcessor.encodeResponse(responseObject, requestProcessor.getEncoding()); } catch (UnsupportedEncodingException e) { log.error("Unsupported encoding: " + e); } catch (IOException e) { log.error("Could not encode response: " + e); } catch (XmlRpcException e) { log.error("Could not encode response: " + e); } cbHTTPResponse.call(result, "text/xml", response); break; } case TIMEOUT: case ERROR: { log.warn("Received an error response from server. " + result); byte[] error = responseProcessor.encodeException(new Exception( result.toString()), requestProcessor.getEncoding()); cbHTTPResponse.call(CBResult.OK(), "text/xml", error); break; } } } }); } catch (Exception e) { log.warn("Received an exception from server. e=" + e.toString()); byte[] error = responseProcessor.encodeException(e, requestProcessor.getEncoding()); log.debug("error.length=" + error.length); cbHTTPResponse.call(CBResult.OK(), "text/xml", error); } } }); } public void deregisterXMLRPCHandler(String handlerName) { throw new UnsupportedOperationException(); } class XMLRPCServers implements XmlRpcHandlerMapping { /* * @see org.apache.xmlrpc.XmlRpcHandlerMapping#getHandler(java.lang.String) */ public Object getHandler(String arg0) throws Exception { Object handler = xmlRPCHandler.get(arg0.substring(0, arg0.indexOf('.'))); if (handler == null) throw new Exception("Handler for " + arg0 + " not found."); return handler; } } public static void main(String[] args) { EventLoopIF eventLoop = new EL(); EL.set(eventLoop); log.info("Testing the implementation of XMLRPCCommIF."); XMLRPCCommIF apiComm = new XMLRPCComm(); int port = 16182; AddressIF apiAddress = AddressFactory.createLocalhost(port); apiComm.registerXMLRPCHandler("examples", new ExampleRPCHandler()); apiComm.initServer(apiAddress, null); /* * log.main("Making XMLRPC call..."); try { * //apiComm.call("http://localhost:16181/", "examples.getStateName", new * XMLRPCCB() { * //apiComm.call("http://xmlrpc.usefulinc.com/demo/server.php", * "system.listMethods", new XMLRPCCB() { * apiComm.call("http://www.mirrorproject.com/xmlrpc/", "mirror.Random", new * XMLRPCCB() { * * public void cb(Object arg0) { log.info("Result of call is: " + arg0); } * * }); } catch (MalformedURLException e1) { log.error("Wrong URL: " + e1); } * catch (UnknownHostException e2) { log.error("Unknown host: " + e2); } */ log.main("Calling examples.getStateName"); try { apiComm.call("http://localhost:16182/", "examples.getStateName", new XMLRPCCB() { public void cb(CBResult result, Object arg0) { log.info("Result of first call is: " + arg0.toString()); } }, 42); } catch (MalformedURLException e1) { log.error("Wrong URL: " + e1); } catch (UnknownHostException e2) { log.error("Unknown host: " + e2); } /* * log.main("Calling http://xmlrpc.usefulinc.com/demo/server.php * system.listMethods"); try { * apiComm.call("http://xmlrpc.usefulinc.com/demo/server.php", * "system.listMethods", new XMLRPCCB() { public void cb(Object arg0) { * log.info("Result of second call is: " + arg0.toString()); } }); } catch * (MalformedURLException e1) { log.error("Wrong URL: " + e1); } catch * (UnknownHostException e2) { log.error("Unknown host: " + e2); } */ EL.get().registerTimerCB(10000, null); EL.get().exit(); EL.get().main(); } } --- NEW FILE: ExampleRPCHandler.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 11, 2005 */ package edu.harvard.syrah.sbon.async.comm.xmlrpc; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; /** * */ public class ExampleRPCHandler { private static final Log log = new Log(ExampleRPCHandler.class); public void getStateName(int value, final CB1<Integer> cbInt) { log.info("Called getStateName with value=" + value); EL.get().registerTimerCB(5000, new CB0() { public void cb(CBResult result) { cbInt.call(CBResult.OK(), 16); } }); } } --- NEW FILE: XMLRPCCallbackHandlerIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 12, 2005 */ package edu.harvard.syrah.sbon.async.comm.xmlrpc; import java.util.Vector; import org.apache.xmlrpc.XmlRpcHandler; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; /** * * This is a new type of XMLRPCHandler that is asynchronous and returns the result with a * callback object. * */ public interface XMLRPCCallbackHandlerIF extends XmlRpcHandler { public void execute (String method, Vector params, CB1<Object> cbObject) throws Exception; } --- NEW FILE: XMLRPCCommIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 9, 2005 */ package edu.harvard.syrah.sbon.async.comm.xmlrpc; import java.net.MalformedURLException; import java.net.UnknownHostException; import edu.harvard.syrah.sbon.async.comm.http.HTTPCommIF; /** * * API for Communication layer using XMLRPC * */ public interface XMLRPCCommIF extends HTTPCommIF { public void call(String urlString, String methodName, Object... arguments) throws MalformedURLException, UnknownHostException; public void call(String urlString, String methodName, XMLRPCCB cbResult, Object... arguments) throws MalformedURLException, UnknownHostException; public void registerXMLRPCHandler(String handlerName, Object handler); public void deregisterXMLRPCHandler(String handlerName); } --- NEW FILE: XMLRPCCB.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 11, 2005 */ package edu.harvard.syrah.sbon.async.comm.xmlrpc; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; /** * */ public abstract class XMLRPCCB extends CB1<Object> { protected abstract void cb(CBResult result, Object httpResponse); } --- NEW FILE: XMLRPCCallbackInvoker.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 12, 2005 */ package edu.harvard.syrah.sbon.async.comm.xmlrpc; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Vector; import org.apache.xmlrpc.XmlRpcException; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; /** * * This is an XMLRPC invoker that knows about callbacks. It is adapted from the * Invoker class in Apache XMLRPC. * */ class XMLRPCCallbackInvoker implements XMLRPCCallbackHandlerIF { private static final Log log = new Log(XMLRPCCallbackInvoker.class); private Object handler; private Class handlerClass; XMLRPCCallbackInvoker(Object handler) { this.handler = handler; this.handlerClass = handler.getClass(); } /* * @see edu.harvard.syrah.sbon.comm.XMLRPCCallbackHandler#execute(java.lang.String, * java.util.Vector, edu.harvard.syrah.sbon.async.CallbacksIF.CB) */ public void execute(String methodName, Vector params, final CB1<Object> cbObject) throws Exception { // Create array with classtype; create ObjectAry with values Class[] argClasses = null; Object[] argValues = null; if (params != null) { argClasses = new Class[params.size() + 1]; argValues = new Object[params.size() + 1]; for (int i = 0; i < params.size(); i++) { argValues[i] = params.elementAt(i); if (argValues[i] instanceof Integer) { argClasses[i] = Integer.TYPE; } else if (argValues[i] instanceof Double) { argClasses[i] = Double.TYPE; } else if (argValues[i] instanceof Boolean) { argClasses[i] = Boolean.TYPE; } else { argClasses[i] = argValues[i].getClass(); } } } // Add callback parameter argClasses[argClasses.length - 1] = CB1.class; argValues[argValues.length - 1] = cbObject; Method method = null; // The last element of the XML-RPC method name is the Java method name. int dot = methodName.lastIndexOf('.'); if (dot > -1 && dot + 1 < methodName.length()) { methodName = methodName.substring(dot + 1); } try { method = handlerClass.getMethod(methodName, argClasses); } catch (NoSuchMethodException e) { throw e; } catch (SecurityException e) { throw e; } // Our policy is to make all public methods callable except the ones defined in java.lang.Object. if (method.getDeclaringClass() == Object.class) { throw new XmlRpcException(0, "Invoker can't call methods " + "defined in java.lang.Object"); } try { log.debug("Invoking method=" + method + " of handler=" + handler + " with args=" + argValues); method.invoke(handler, argValues); } catch (IllegalAccessException e) { throw e; } catch (IllegalArgumentException e) { throw e; } catch (InvocationTargetException e) { log.warn("Got an exception=" + e.getTargetException()); e.getTargetException().printStackTrace(); // check whether the thrown exception is XmlRpcException Throwable t = e.getTargetException(); if (t instanceof XmlRpcException) { throw (XmlRpcException) t; } // It is some other exception throw e; } } /* * @see org.apache.xmlrpc.XmlRpcHandler#execute(java.lang.String, * java.util.Vector) */ public Object execute(String method, Vector params) throws Exception { throw new UnsupportedOperationException(); } } |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:41
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async Added Files: EL.java Barrier.java LoopIt.java CallbacksIF.java Sync.java EventLoopIF.java Loop.java CBResult.java CBQueue.java Config.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: EL.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async; import static edu.harvard.syrah.sbon.async.EL.Priority.HIGH; import static edu.harvard.syrah.sbon.async.EL.Priority.LOW; import static edu.harvard.syrah.sbon.async.EL.Priority.NORMAL; import java.io.IOException; import java.nio.channels.*; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.PriorityBlockingQueue; import edu.harvard.syrah.prp.ANSI; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.POut; import edu.harvard.syrah.prp.PTimer; import edu.harvard.syrah.prp.ANSI.Color; import edu.harvard.syrah.sbon.async.CallbacksIF.*; /** * * Implementation of the main event loop. * * This class is modeled after the Java implementation of David Mazieres' * libasync library, which was done by Sean Rhea as part of the Bamboo * implementation. * */ public class EL implements EventLoopIF { protected static final Log log = new Log(EL.class); public int SELECTION_KEY_ALL_OPS = SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE; // Maximum time that we can spend servicing the event queue without accessing // the network private static final long MAX_EVENT_QUEUE_PERIOD = 10; // static final long MAX_CB_TIME = 0; // was 5ms // Maximum size for the event queue private static final long EVENT_QUEUE_LIMIT = 1000; private static final boolean DEFAULT_SHOW_CB_TIME = true; private static final long DEFAULT_STATE_DUMP_INTERVAL = 0; private static final boolean DEFAULT_SHOW_IDLE = false; private boolean showIdle; private boolean showCBTime; private static final long NO_SLACKTIME = 0; private static final long INFINITE_SLACKTIME = Long.MAX_VALUE; public enum Priority { HIGH, NORMAL, LOW; } private Queue<CB0> now_eQ_HP = new ConcurrentLinkedQueue<CB0>(); private Queue<CB0> now_eQ_NP = new ConcurrentLinkedQueue<CB0>(); private Queue<CB0> now_eQ_LP = new ConcurrentLinkedQueue<CB0>(); private Queue<CB0> eQ_HP = new PriorityBlockingQueue<CB0>(); private Queue<CB0> eQ_NP = new PriorityBlockingQueue<CB0>(); private Queue<CB0> eQ_LP = new PriorityBlockingQueue<CB0>(); private Selector selector; private Map<SelectableChannel, ChannelSub> channelSubTable = new HashMap<SelectableChannel, ChannelSub>(); private boolean loopExit = false; private boolean forceExit = false; private boolean ranShutdown = false; private int numSelectorKeys; private static EventLoopIF eventLoop = null; private int max_eQ_HP; private int max_eQ_NP; private int max_eQ_LP; private int max_now_eQ_HP; private int max_now_eQ_NP; private int max_now_eQ_LP; public static void set(EventLoopIF newEventLoop) { eventLoop = newEventLoop; } public static EventLoopIF get() { return eventLoop; } public EL() { this(DEFAULT_STATE_DUMP_INTERVAL, DEFAULT_SHOW_IDLE); } public EL(final long stateDumpInterval, final boolean showIdle) { Thread.currentThread().setPriority(Thread.MAX_PRIORITY); this.showIdle = showIdle; this.showCBTime = DEFAULT_SHOW_CB_TIME; log.debug("Opening the selector..."); try { selector = Selector.open(); } catch (IOException e) { log.error("Could not open selector: " + e); } /* * TODO * * This is sometimes not executed on shutdown and I don't understand why. * This might be connected to ant...? */ // Make sure that we shutdown cleanly Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { EL.this.shutdown(); } }); if (stateDumpInterval != 0) { this.registerTimerCB(stateDumpInterval, new CB0("ELStateDumper") { protected void cb(CBResult result) { dumpState(true); if (!shouldExit()) EL.this.registerTimerCB(stateDumpInterval, this); } }); } } public void dumpState(boolean eventQueueDump) { log.info("now_eQ_HP.size=" + now_eQ_HP.size() + " now_eQ_HP.max=" + max_now_eQ_HP + (eventQueueDump ? " now_eq_HP=" + POut.toString(now_eQ_HP) : "")); log.info("now_eQ_NP.size=" + now_eQ_NP.size() + " now_eQ_NP.max=" + max_now_eQ_NP + (eventQueueDump ? " now_eq_NP=" + POut.toString(now_eQ_NP) : "")); log.info("now_eQ_LP.size=" + now_eQ_LP.size() + " now_eQ_LP_max=" + max_now_eQ_LP + (eventQueueDump ? " now_eq_LP=" + POut.toString(now_eQ_LP) : "")); log.info("eQ_HP.size=" + eQ_HP.size() + " eQ_HP.max=" + max_eQ_HP + (eventQueueDump ? " eq_HP=" + POut.toString(eQ_HP) : "")); log.info("eQ_NP.size=" + eQ_NP.size() + " eQ_NP.max=" + max_eQ_NP + (eventQueueDump ? " eq_NP=" + POut.toString(eQ_NP) : "")); log.info("eQ_LP.size=" + eQ_LP.size() + " eQ_LP_max=" + max_eQ_LP + (eventQueueDump ? " eq_LP=" + POut.toString(eQ_LP) : "")); log.info("cST.size=" + channelSubTable.size() + " cST=" + POut.toString(channelSubTable) + " numSelectorKeys=" + numSelectorKeys); log.info("totalMem=" + POut.toString(((double) Runtime.getRuntime().totalMemory()) / (1024 * 1024)) + " MB" + " freeMem=" + POut.toString(((double) Runtime.getRuntime().freeMemory()) / (1024 * 1024)) + " MB"); // log.main("eQ_NP.peek=" + eQ_NP.remove()); // log.main("eQ_NP.peek=" + eQ_NP.remove()); // log.main("eQ_NP.peek=" + eQ_NP.remove()); max_eQ_HP = eQ_HP.size() > max_eQ_HP ? eQ_HP.size() : max_eQ_HP; max_eQ_NP = eQ_NP.size() > max_eQ_NP ? eQ_NP.size() : max_eQ_NP; max_eQ_LP = eQ_LP.size() > max_eQ_LP ? eQ_LP.size() : max_eQ_LP; max_now_eQ_HP = now_eQ_HP.size() > max_now_eQ_HP ? now_eQ_HP.size() : max_now_eQ_HP; max_now_eQ_NP = now_eQ_NP.size() > max_now_eQ_NP ? now_eQ_NP.size() : max_now_eQ_NP; max_now_eQ_LP = now_eQ_LP.size() > max_now_eQ_LP ? now_eQ_LP.size() : max_now_eQ_LP; } /* * Effectively inserts an event into the event queue * * (non-Javadoc) * * @see edu.harvard.syrah.sbon.async.EventLoopIF#registerTimerEvent(edu.harvard.syrah.sbon.async.CB0) */ public CB0 registerTimerCB(CB0 cb) { return registerTimerCB(cb, Priority.NORMAL); } public CB0 registerTimerCB(CB0 cb, Priority priority) { return registerTimerCB(0, cb, priority); } public CB0 registerTimerCB(long delay, CB0 cb) { return registerTimerCB(delay, cb, Priority.NORMAL); } /* * Acts like delaycb from the libasync C library * * (non-Javadoc) * * @see edu.harvard.syrah.sbon.async.EventLoopIF#registerTimerEvent(long, * edu.harvard.syrah.sbon.async.CB0) */ public CB0 registerTimerCB(long delay, CB0 cb, Priority priority) { cb.ts = System.currentTimeMillis() + delay; Queue<CB0> eventQueue = null; if (delay != 0) { eventQueue = getEventQueue(priority); } else { eventQueue = getNowEventQueue(priority); } eventQueue.add(cb); /* * TODO handle this properly */ /* * if (eventQueue.size() > EVENT_QUEUE_LIMIT) { log.warn("The queue with * pri=" + priority + " has grown beyong its limit (" + EVENT_QUEUE_LIMIT + ") * size=" + eventQueue.size() + " cb=" + cb); } */ // Make sure that the selector is not block if this is called from another thread selector.wakeup(); return cb; } public void registerTimerCB(Barrier barrier, CB0 cb) { registerTimerCB(barrier, cb, Priority.NORMAL); } public void registerTimerCB(Barrier barrier, CB0 cb, Priority priority) { registerTimerCB(barrier, 0, cb, priority); } public void registerTimerCB(Barrier barrier, long delay, CB0 cb) { registerTimerCB(barrier, delay, cb, Priority.NORMAL); } public void registerTimerCB(Barrier barrier, long delay, CB0 cb, Priority priority) { barrier.registerTimerCB(delay, cb, priority); } public long deregisterTimerCB(CB0 cb) { return deregisterTimerCB(cb, Priority.NORMAL); } public long deregisterTimerCB(CB0 cb, Priority priority) { assert cb != null; Queue<CB0> eventQueue = getEventQueue(priority); if (!eventQueue.remove(cb)) { log.error("The eventQueue=" + POut.toString(eventQueue) + " doesn't contain the cb=" + cb); } return cb.ts - System.currentTimeMillis(); } public void registerCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); assert channelSub != null : "channelSub is null for registerCommCB call. channel=" + channel; assert channelSub.checkCommCBs(selectionKey); try { // Register with selector while preserving previous registrations if (channel.keyFor(selector) != null) { // log.debug("Updating key."); channel.register(selector, channel.keyFor(selector).interestOps() | selectionKey, channelSub); } else { // log.debug("Adding new key."); channel.register(selector, selectionKey, channelSub); } } catch (IllegalArgumentException e) { log.error("Illegal key selector mask: " + e + " selectionKey=" + selectionKey); } catch (CancelledKeyException e) { log.warn(("CancelledKeyException channel=" + channel + " e=" + e)); } // log.debug("channelSub: channel=" + channel + " "+ channelSub); } public void deregisterCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); // assert channelSub != null; if (channelSub == null) { log.warn("channelSub==null channel=" + channel + " Ignoring."); return; } SelectionKey key = channel.keyFor(selector); assert key != null; // Deregister with selector while preserving previous registrations channel.register(selector, key.interestOps() & (~selectionKey), channelSub); } public void deregisterAllCommCBs(SelectableChannel channel) throws ClosedChannelException { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); // assert channelSub != null : "channel=" + channel; if (channelSub == null) { log.warn("channelSub==null channel=" + channel + " Ignoring."); return; } channel.register(selector, 0, channelSub); } public CB1R<Boolean, SelectionKey> getCommCB(SelectableChannel channel, int selectionKey) { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); // Is this an unknown channel? if (channelSub == null) return null; return channelSub.getCommCB(selectionKey); } public void setCommCB(SelectableChannel channel, int selectionKey, CB1R<Boolean, SelectionKey> commCB) { assert channel != null; assert commCB != null; ChannelSub channelSub = channelSubTable.get(channel); if (channelSub == null) { channelSub = new ChannelSub(); channelSubTable.put(channel, channelSub); } channelSub.setCommCBs(selectionKey, commCB); } public void unsetCommCB(SelectableChannel channel, int selectionKey) { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); assert channelSub != null; assert (!channel.keyFor(selector).isValid() || (channel.keyFor(selector) .interestOps() & selectionKey) == 0); channelSub.setCommCBs(selectionKey, null); if (channelSub.isEmpty()) { channelSubTable.remove(channel); } } public void unsetAllCommCBs(SelectableChannel channel) { assert channel != null; ChannelSub channelSub = channelSubTable.get(channel); if (channelSub != null) { /* * TODO This assertion is violated when the key has already been * cancelled. */ // assert channel.keyFor(selector) == null || // (channel.keyFor(selector).interestOps() & SELECTION_KEY_ALL_OPS) == 0; channelSub.setCommCBs(SELECTION_KEY_ALL_OPS, null); channelSubTable.remove(channel); } else { log.warn("Trying to close an already closed channel=" + channel); } } public void main() { log.debug(ANSI.color(Color.LIGHTRED, "Starting main event loop...")); long slackTime; long nextEventTS; // PTimer pt = new PTimer(false); while (!forceExit && !(loopExit && eventQueuesEmpty())) { // log.debug(" Starting new event loop iteration"); // pt.start(); nextEventTS = handleEventQueues(); // pt.stop(log, "handleEventQueues took"); slackTime = (nextEventTS == INFINITE_SLACKTIME) ? INFINITE_SLACKTIME : (nextEventTS - System.currentTimeMillis()); // log.debug("slackTime=" + (slackTime == INFINITE_SLACKTIME ? "INF" : // String.valueOf(slackTime)) + " eventQueuesEmpty=" + // eventQueuesEmpty()); if ((!now_eQ_HP.isEmpty() || !now_eQ_NP.isEmpty() || !now_eQ_LP.isEmpty()) && (slackTime == INFINITE_SLACKTIME)) { log.warn("Now_queues have events but slacktime is INF. Resetting."); slackTime = 0; } assert (slackTime != INFINITE_SLACKTIME) || eventQueuesEmpty() : "slackTime=" + slackTime; if (showIdle && slackTime != INFINITE_SLACKTIME && slackTime > 0) log.info("Idle for " + slackTime + " ms"); // now_eQ_NORM.size=" + // now_eQ_NP.size()); if (!forceExit && !loopExit) { // pt.start(); handleSelector(slackTime); // pt.stop(log, "handleSelector (slackTime=" + (slackTime == // INFINITE_SLACKTIME ? "INF" : String.valueOf(slackTime)) + ") took"); // pt.start(); handleSelectCallbacks(); // pt.stop(log, "handleSelectCBs took"); // log.debug("numSelectors=" + numSelectorKeys); } } log.main("Exited main event loop."); } public void forceExit() { forceExit = true; } public void exit() { log.main(ANSI.color(Color.LIGHTRED, "Ready to exit main event loop...")); loopExit = true; } public boolean shouldExit() { return loopExit; } public void handleNetwork() { handleSelector(-1); handleSelectCallbacks(); } public boolean checkChannelState(SelectableChannel channel, int selectionKey) { handleSelector(-1); for (Iterator keyIt = selector.selectedKeys().iterator(); keyIt.hasNext();) { SelectionKey key = (SelectionKey) keyIt.next(); if (key.channel().equals(channel) && key.isValid() && ((key.readyOps() & selectionKey) == selectionKey)) { return true; } } return false; } private boolean eventQueuesEmpty() { return eQ_HP.isEmpty() && eQ_NP.isEmpty() && eQ_LP.isEmpty() && now_eQ_HP.isEmpty() && now_eQ_NP.isEmpty() && now_eQ_LP.isEmpty(); } private Queue<CB0> getEventQueue(Priority priority) { Queue<CB0> eventQueue = null; switch (priority) { case HIGH: { eventQueue = eQ_HP; break; } case NORMAL: { eventQueue = eQ_NP; break; } case LOW: { eventQueue = eQ_LP; break; } default: { log.error("Unknown priority"); break; } } return eventQueue; } private Queue<CB0> getNowEventQueue(Priority priority) { Queue<CB0> eventQueue = null; switch (priority) { case HIGH: { eventQueue = now_eQ_HP; break; } case NORMAL: { eventQueue = now_eQ_NP; break; } case LOW: { eventQueue = now_eQ_LP; break; } default: { log.error("Unknown priority"); break; } } return eventQueue; } private long handleEventQueues() { long startTime = 0; long globalNextTS = INFINITE_SLACKTIME; long processingTime = 0; boolean HP_hasSlack = false; boolean NP_hasSlack = false; boolean LP_hasSlack = false; // while ((!(eQ_HP.isEmpty() && now_eQ_HP.isEmpty()) && !HP_hasSlack) // || (!(eQ_NP.isEmpty() && now_eQ_NP.isEmpty()) && !NP_hasSlack) // || (!(eQ_LP.isEmpty() && now_eQ_LP.isEmpty()) && !LP_hasSlack)) { while (((!eQ_HP.isEmpty() && !HP_hasSlack) || !now_eQ_HP.isEmpty()) || ((!eQ_NP.isEmpty() && !NP_hasSlack) || !now_eQ_NP.isEmpty()) || ((!eQ_LP.isEmpty() && !LP_hasSlack) || !now_eQ_LP.isEmpty())) { // log.debug("Executing events..."); long currentTime = System.currentTimeMillis(); if (startTime == 0) startTime = currentTime; processingTime = currentTime - startTime; // log.debug("processing.remaining=" + (MAX_EVENT_QUEUE_PERIOD - // processingTime)); if (processingTime >= MAX_EVENT_QUEUE_PERIOD) { // Make sure that we continue the event queue processing globalNextTS = currentTime; break; } HP_hasSlack = false; NP_hasSlack = false; LP_hasSlack = false; globalNextTS = INFINITE_SLACKTIME; boolean handleNext = true; /* HIGH */ // log.debug("Considering HIGH queue..."); if (!eQ_HP.isEmpty() && handleNext) { long nextEventTS = handleEvent(HIGH, currentTime); if (nextEventTS > currentTime) { HP_hasSlack = now_eQ_HP.isEmpty(); // log.debug("HIGH has slack"); } else { handleNext = false; } globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("Considering now_HIGH queue..."); if (!now_eQ_HP.isEmpty() && handleNext) { long nextEventTS = handleNowEvent(HIGH); handleNext = false; globalNextTS = Math.min(nextEventTS, globalNextTS); } /* NORMAL */ // log.debug("Considering NORMAL queue..."); if (!eQ_NP.isEmpty() && handleNext) { long nextEventTS = handleEvent(NORMAL, currentTime); if (nextEventTS > currentTime) { NP_hasSlack = now_eQ_NP.isEmpty(); // log.debug("NORMAL has slack"); } else { handleNext = false; } globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("Considering now_NORMAL queue..."); if (!now_eQ_NP.isEmpty() && handleNext) { long nextEventTS = handleNowEvent(NORMAL); handleNext = false; globalNextTS = Math.min(nextEventTS, globalNextTS); } /* LOW */ // log.debug("Considering LOW queue..."); if (!eQ_LP.isEmpty() && handleNext) { long nextEventTS = handleEvent(LOW, currentTime); if (nextEventTS > currentTime) { LP_hasSlack = now_eQ_LP.isEmpty(); // log.debug("LOW has slack"); } else { handleNext = false; } globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("Considering now_LOW queue..."); if (!now_eQ_LP.isEmpty() && handleNext) { long nextEventTS = handleNowEvent(LOW); handleNext = false; globalNextTS = Math.min(nextEventTS, globalNextTS); } // log.debug("globalNextTS=" + globalNextTS); } // log.debug("globalNextTS=" + (globalNextTS == INFINITE_SLACKTIME ? "INF" : // String.valueOf(globalNextTS)) // + " eventQueueTime=" + (MAX_EVENT_QUEUE_PERIOD - processingTime)); return globalNextTS; } private long handleEvent(Priority priority, long currentTime) { Queue<CB0> eventQueue = getEventQueue(priority); CB0 cb = eventQueue.peek(); long eventSlackTime = cb.ts - currentTime; // log.debug("pri=" + priority + " eventSlackTime=" + eventSlackTime); // Do we need to execute the event at once? if (eventSlackTime <= 0) { assert cb != null; PTimer pt = null; if (showCBTime) pt = new PTimer(); // log.debug("cb=" + nextEvent.cb); cb.cb(CBResult.OK()); // pt.stop(log, "Event CB (cb=" + nextEvent.cb + ") took"); if (showCBTime) { pt.stop(); if (MAX_CB_TIME > 0 && pt.getTime() > MAX_CB_TIME) log.warn("cb=" + cb + " took " + pt.toString() + " > " + MAX_CB_TIME + " ms"); } eventQueue.remove(cb); if (!eventQueue.isEmpty()) { cb = eventQueue.peek(); return cb.ts; } return INFINITE_SLACKTIME; } // log.debug("pri=" + priority + " nextEventTS=" + nextEvent.ts); return cb.ts; } private long handleNowEvent(Priority priority) { Queue<CB0> eventQueue = getNowEventQueue(priority); CB0 cb = eventQueue.remove(); assert cb != null && cb != null; PTimer pt = null; if (showCBTime) pt = new PTimer(); // log.info("now_cb=" + nextEvent.cb); cb.call(CBResult.OK()); // pt.stop(log, "Now_Event CB (cb=" + nextEvent.cb + ") took"); if (showCBTime) { pt.stop(); if (MAX_CB_TIME > 0 && pt.getTime() > MAX_CB_TIME) log.warn("cb=" + cb + " took " + pt.toString() + " > " + MAX_CB_TIME + " ms"); } if (!eventQueue.isEmpty()) { cb = eventQueue.peek(); return cb.ts; } return INFINITE_SLACKTIME; } private void handleSelector(long slackTime) { if (slackTime > 0) { if (slackTime == INFINITE_SLACKTIME) { // Block indefinitely try { // log.debug("Sleeping..." /* + eQ=" + eventQueue.size() */ ); selector.select(); // log.debug("Woken up..."); //assert (eventqueues not empty || forceExit || !selector.selectedKeys().isEmpty()) : "The selector returned without any selected keys. This should never happen."; } catch (IOException e) { log.error("Could not complete select(): " + e); } } else { // Block for slacktime try { // log.debug("Sleeping for " + slackTime + " ms"); selector.select(slackTime); // log.debug("Woken up..."); } catch (IOException e) { log.error("Could not complete select(slackTime): " + e); } } } else { /* * TODO It would make sense to plan in advance here and sleep until the * next event. A selectNow would only be necessary if the next event has * to be executed immediately. */ // Do an instant select (non blocking) try { selector.selectNow(); } catch (IOException e) { log.error("Could not complete selectNow(): " + e); } } } private void handleSelectCallbacks() { try { // PTimer pt = new PTimer(); numSelectorKeys = selector.selectedKeys().size(); //log.debug("selectedKeys.size=" + numSelectorKeys); for (Iterator keyIt = selector.selectedKeys().iterator(); keyIt.hasNext();) { SelectionKey key = (SelectionKey) keyIt.next(); CB1R<Boolean, SelectionKey> cbComm = null; // commEvent.ts = System.currentTimeMillis(); ChannelSub channelSub = (ChannelSub) key.attachment(); int readyOps = key.readyOps(); //log.debug("channel=" + key.channel()); if (key.isValid() && key.isAcceptable()) { // log.debug("Handling comms cb with key.isAcceptable()"); if (channelSub.acceptCB != null) { cbComm = channelSub.acceptCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_ACCEPT; } // pt.stop(log, "Accept CB took"); } else { log.debug("acceptCB=null channel=" + key.channel() + "cST=" + POut.toString(channelSubTable)); } } if (key.isValid() && key.isConnectable()) { // log.debug("Handling comms cb with key.isConnectable()"); if (channelSub.connectCB != null) { cbComm = channelSub.connectCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_ACCEPT; } // pt.stop(log, "Connect CB took"); } else { log.debug("connectCB=null channel=" + key.channel() + " cST=" + POut.toString(channelSubTable)); } } if (key.isValid() && key.isReadable()) { //log.debug("Handling comms cb with key.isReadable()"); if (channelSub.readCB != null) { cbComm = channelSub.readCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_READ; } // pt.stop(log, "Read CB took"); } else { log.debug("readCB=null channel=" + key.channel() + " cST=" + POut.toString(channelSubTable)); } } if (key.isValid() && key.isWritable()) { // log.debug("Handling comms cb with key.isWritable()"); if (channelSub.writeCB != null) { cbComm = channelSub.writeCB; // PTimer pt = new PTimer(); if (cbComm.cb(CBResult.OK(), key) && key.isValid()) { readyOps &= ~SelectionKey.OP_WRITE; } // pt.stop(log, "Write CB took"); } else { log.debug("writeCB=null channel=" + key.channel() + "cST=" + POut.toString(channelSubTable)); } } // Have you dealt with all the interest ops? if ((!key.isValid()) || readyOps == 0) { try { keyIt.remove(); } catch (ConcurrentModificationException e) { log.error(e.toString()); } } } // pt.stop(log, "HandleSelectCBs took"); } catch (ClosedSelectorException e) { log.warn("Selector closed."); shutdown(); } } protected void shutdown() { if (!ranShutdown) { log.debug("Running shutdown hook."); forceExit = true; try { log.debug("Closing selector with all connections."); selector.close(); } catch (IOException e) { log.error("Could not close selector"); } log.debug("Shutdown hook complete."); ranShutdown = true; } } class ChannelSub { protected CB1R<Boolean, SelectionKey> acceptCB = null; protected CB1R<Boolean, SelectionKey> connectCB = null; protected CB1R<Boolean, SelectionKey> readCB = null; protected CB1R<Boolean, SelectionKey> writeCB = null; void setCommCBs(int selectionKey, CB1R<Boolean, SelectionKey> commCB) { if ((selectionKey & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { acceptCB = commCB; } if ((selectionKey & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) { connectCB = commCB; } if ((selectionKey & SelectionKey.OP_READ) == SelectionKey.OP_READ) { readCB = commCB; } if ((selectionKey & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { writeCB = commCB; } } CB1R<Boolean, SelectionKey> getCommCB(int selectionKey) { if ((selectionKey & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { return acceptCB; } if ((selectionKey & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) { return connectCB; } if ((selectionKey & SelectionKey.OP_READ) == SelectionKey.OP_READ) { return readCB; } if ((selectionKey & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { return writeCB; } log.error("Unknown selectionKey"); return null; } boolean checkCommCBs(int selectionKey) { if ((selectionKey & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT && acceptCB == null) { return false; } if ((selectionKey & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && connectCB == null) { return false; } if ((selectionKey & SelectionKey.OP_READ) == SelectionKey.OP_READ && readCB == null) { return false; } if ((selectionKey & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && writeCB == null) { return false; } return true; } boolean isEmpty() { return acceptCB == null && connectCB == null && readCB == null && writeCB == null; } public String toString() { return "acceptCB=" + (acceptCB != null) + "[" + (acceptCB != null ? acceptCB.toString() : "") + "] " + "connectCB=" + (connectCB != null) + "[" + (connectCB != null ? connectCB.toString() : "") + "] " + "readCB=" + (readCB != null) + "[" + (readCB != null ? readCB.toString() : "") + "] " + "writeCB=" + (writeCB != null) + "[" + (writeCB != null ? writeCB.toString() : "") + "]"; } } public static void main(String[] args) { EL.set(new EL()); log.main("Registering callback"); EL.get().registerTimerCB(new SimpleCB("NORM-3", 1000), HIGH); EL.get().registerTimerCB(new SimpleCB("NORM-2", 1000), NORMAL); EL.get().registerTimerCB(new SimpleCB("NORM-1", 1000), LOW); // EventLoop.get().registerTimerCB(100, new SimpleCB("NORM-A", 1000), // NORMAL); // EventLoop.get().registerTimerCB(200, new SimpleCB("NORM-B", 1000), // NORMAL); // EventLoop.get().registerTimerCB(300, new SimpleCB("NORM-C", 1000), // NORMAL); EL.get().dumpState(true); // EventLoop.get().registerTimerCB(new SimpleCB("HIGH", 1500), HIGH); // EventLoop.get().registerTimerCB(new SimpleCB("NORM", 1000), NORMAL); // EventLoop.get().registerTimerCB(new SimpleCB("LOW", 500), LOW); CB0 cbWhileLoop = new CB0() { protected void cb(CBResult result) { // do stuff boolean exitCondition = false; if (!exitCondition) EL.get().registerTimerCB(this); } }; EL.get().registerTimerCB(cbWhileLoop); EL.get().main(); } } class SimpleCB extends CB0 { private long lastRun; private long interval; public SimpleCB(String name, long interval) { super(name); this.interval = interval; } protected void cb(CBResult result) { //EventLoop.get().dumpState(true); long currentTime = System.currentTimeMillis(); if (lastRun == 0) lastRun = currentTime; POut.p("Running cb " + toString() + "... " + (currentTime - lastRun)); lastRun = currentTime; //EventLoop.get().registerTimerCB(interval, this); } } --- NEW FILE: Barrier.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 28, 2005 */ package edu.harvard.syrah.sbon.async; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0R; import edu.harvard.syrah.sbon.async.EL.Priority; /** * * Allows us to synchronize callbacks * */ public class Barrier { private static final Log log = new Log(Barrier.class); private static final long PREDICATE_INTERVAL = 1000; private boolean triggered = false; private boolean startState; private boolean activated = true; private int count; private CB0 cb; private long delay; protected CB0 userCB; public Barrier(boolean startState) { this(startState, 0, null); } public Barrier(int count) { this(count == 0 ? true : false, count, null); } public Barrier(int count, CB0 cb) { this(count == 0 ? true : false, count, cb); } public Barrier(final CB0R<Boolean> cbPredicate) { this.startState = false; this.count = 1; this.triggered = startState; EL.get().registerTimerCB(new CB0() { protected void cb(CBResult resultOK) { if (cbPredicate.call(resultOK)) Barrier.this.join(); else EL.get().registerTimerCB(PREDICATE_INTERVAL, this); } }); } private Barrier(boolean startState, int count, CB0 cb) { this.startState = startState; this.count = count; this.cb = cb; this.triggered = startState; } public void activate() { activated = true; } public void deactivate() { activated = false; } public boolean isActive() { return activated; } public void fork() { assert startState || !triggered || !activated : "Could not fork: startState=" + startState + " triggered=" + triggered + " activated=" + activated; startState = false; count++; triggered = false; } public void join() { assert startState || !triggered || !activated : "Could not join: startState=" + startState + " triggered=" + triggered + " activated=" + activated; startState = false; count--; if (count == 0 && activated) { triggered = true; if (cb != null) { log.debug("Barrier triggered."); // Register the event callback EL.get().registerTimerCB(delay, cb); } if (userCB != null) { EL.get().registerTimerCB(new CB0() { public void cb(CBResult result) { userCB.cb(CBResult.OK()); } }); } } } /* * added by glp */ public void setNumForks(int newCount){ this.count = newCount; if (newCount > 0) { triggered = false; } } public void remove() { log.debug("Barrier removed."); } public void registerCB(CB0 myUserCB) { this.userCB = myUserCB; if (triggered) { EL.get().registerTimerCB(new CB0() { public void cb(CBResult result) { userCB.cb(result); } }); } } void registerTimerCB(long delay, CB0 cb, Priority priority) { this.cb = cb; this.delay = delay; if (triggered) { log.debug("Barrier triggered."); // Register the event callback EL.get().registerTimerCB(delay, cb, priority); } } public String toString() { return "count=" + count + " triggered=" + triggered; } } --- NEW FILE: CBResult.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jul 11, 2005 */ package edu.harvard.syrah.sbon.async; import java.io.Serializable; public class CBResult implements Serializable { static final long serialVersionUID = 1000000001L; public enum CBState {OK, ERROR, UNKNOWN, TIMEOUT} public CBState state; public String what; private CBResult(CBState state) { this(state, null); } private CBResult(CBState state, String what) { this.state = state; this.what = what; } public static CBResult OK() { return new CBResult(CBState.OK); } public static CBResult OK(String what) { return new CBResult(CBState.OK, what); } public static CBResult ERROR() { return new CBResult(CBState.ERROR); } public static CBResult ERROR(String what) { return new CBResult(CBState.ERROR, what); } public static CBResult ERROR(Exception e) { return new CBResult(CBState.ERROR, e.toString()); } public static CBResult UNKNOWN() { return new CBResult(CBState.UNKNOWN); } public static CBResult UNKNOWN(String what) { return new CBResult(CBState.UNKNOWN, what); } public static CBResult TIMEOUT() { return new CBResult(CBState.TIMEOUT); } public static CBResult TIMEOUT(String what) { return new CBResult(CBState.TIMEOUT, what); } public String toString() { switch (state) { case OK : return "OK"; case ERROR : return "ERROR(" + what + ")"; case UNKNOWN : return "UNKNWON(" + what + ")"; case TIMEOUT : return "TIMEOUT(" + what + ")"; default : return "?"; } } } --- NEW FILE: CBQueue.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Nov 8, 2005 */ package edu.harvard.syrah.sbon.async; import java.util.LinkedList; import java.util.List; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; public class CBQueue { private static final Log log = new Log(CBQueue.class); private static final long RETRY_INTERVAL = 1000; private int maxCBs; private List<CB1<CB1<Boolean>>> waitingCBs = new LinkedList<CB1<CB1<Boolean>>>(); private int outstandingCBs = 0; public CBQueue(int maxCBs) { this.maxCBs = maxCBs; } public void enqueueDequeue(final CB1<CB1<Boolean>> newCB) { //log.debug("outstandingCBs=" + outstandingCBs + " maxCBs=" + maxCBs + " waitingCBs.size=" + waitingCBs.size()); if (newCB != null) waitingCBs.add(newCB); if ((maxCBs == 0 || outstandingCBs < maxCBs) && !waitingCBs.isEmpty()) { final CB1<CB1<Boolean>> cb = waitingCBs.remove(0); outstandingCBs++; cb.call(CBResult.OK(), new CB1<Boolean>() { protected void cb(CBResult result, Boolean handled) { outstandingCBs--; if (handled) { dequeue(); } else { waitingCBs.add(0, cb); EL.get().registerTimerCB(RETRY_INTERVAL, new CB0() { protected void cb(CBResult result) { dequeue(); } }); } } }); } } public void dequeue() { enqueueDequeue(null); } } --- NEW FILE: LoopIt.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Aug 11, 2005 */ package edu.harvard.syrah.sbon.async; import java.util.ConcurrentModificationException; import java.util.Iterator; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0R; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.EL.Priority; public class LoopIt<T> { protected static final Log log = new Log(LoopIt.class); // Maximum time of single loop iteration in nanoseconds private static final long MAX_SINGLE_ITERATION_TIME = 1 * 1000000; private String name; protected CB2<T, CB0R<Boolean>> cbIteratorRecursion; private Priority priority; protected Iterator<T> it; private boolean remove; private boolean smartScheduling; protected Barrier loopBarrier; public LoopIt(Iterable<T> it, final CB1<T> cbRecursion) { this(null, it, false, false, new CB2<T, CB0R<Boolean>>() { protected void cb(CBResult result, T item, CB0R<Boolean> cbMore) { cbRecursion.call(result, item); cbMore.callOK(); } }); } public LoopIt(Iterable<T> it, final CB2<T, CB0> cbRecursion) { this(null, it, false, false, new CB2<T, CB0R<Boolean>>() { protected void cb(CBResult result, T item, final CB0R<Boolean> cbMore) { cbRecursion.call(result, item, new CB0() { protected void cb(CBResult result) { cbMore.callOK(); } }); } }); } public LoopIt(String name, Iterable<T> it, CB2<T, CB0R<Boolean>> cbIteratorRecursion) { this(name, it, true, false, cbIteratorRecursion, Priority.NORMAL); } public LoopIt(String name, Iterable<T> it, boolean remove, CB2<T, CB0R<Boolean>> cbIteratorRecursion) { this(name, it, remove, false, cbIteratorRecursion, Priority.NORMAL); } public LoopIt(String name, Iterable<T> it, boolean remove, boolean smartScheduling, CB2<T, CB0R<Boolean>> cbIteratorRecursion) { this(name, it, remove, smartScheduling, cbIteratorRecursion, Priority.NORMAL); } public LoopIt(String name, Iterable<T> it, boolean remove, boolean smartScheduling, CB2<T, CB0R<Boolean>> cbIteratorRecursion, Priority priority) { this.name = name; this.it = it.iterator(); this.remove = remove; this.smartScheduling = smartScheduling; this.cbIteratorRecursion = cbIteratorRecursion; this.priority = priority; loopBarrier = new Barrier(1); } public void execute(CB0 cbDone) { EL.get().registerTimerCB(execute(), cbDone); } public Barrier execute() { EL.get().registerTimerCB(new CB0(name) { protected void cb(CBResult resultOK) { recurseIterator(); } }); return loopBarrier; } long startTime; long runningTime; public void recurseIterator() { if (startTime == 0) startTime = System.nanoTime(); if (it.hasNext()) { try { final T item = it.next(); runningTime = System.nanoTime() - startTime; if (smartScheduling && runningTime < MAX_SINGLE_ITERATION_TIME) { cbIteratorRecursion.call(CBResult.OK(), item, new CB0R<Boolean>() { protected Boolean cb(CBResult result) { // Try to remove the object after an iteration to reclaim memory if (remove) { try { it.remove(); } catch (UnsupportedOperationException e) { /* ignore */ } } boolean hasNext = it.hasNext(); recurseIterator(); return hasNext; } }); } else { EL.get().registerTimerCB(new CB0(name) { protected void cb(CBResult result) { runningTime = 0; cbIteratorRecursion.call(CBResult.OK(), item, new CB0R<Boolean>() { protected Boolean cb(CBResult result) { // Try to remove the object after an iteration to reclaim memory if (remove) { try { it.remove(); } catch (UnsupportedOperationException e) { /* ignore */ } } boolean hasNext = it.hasNext(); recurseIterator(); return hasNext; } }); } }, priority); } } catch (ConcurrentModificationException e) { log.error("Concurrent loop modification: name=" + name + " it=" + it + " e=" + e); } } else { loopBarrier.join(); startTime = 0; } } } --- NEW FILE: Loop.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Aug 11, 2005 */ package edu.harvard.syrah.sbon.async; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.EL.Priority; public class Loop<T> { protected static final Log log = new Log(Loop.class); private String name; protected CB2<T, CB1<T>> cbRecursion; private Priority priority; private T startItem; protected Barrier loopBarrier; public Loop(String name, T item, CB2<T, CB1<T>> cbRecursion) { this(name, item, cbRecursion, Priority.NORMAL); } public Loop(String name, T item, CB2<T, CB1<T>> cbRecursion, Priority priority) { this.name = name; this.cbRecursion = cbRecursion; this.startItem = item; this.priority = priority; this.loopBarrier = new Barrier(false); } public Barrier execute() { recurse(startItem); /* * EventLoop.get().registerTimerCB(new EventCB(name) { protected void * cb(CBResult result, Event timerEvent) { recurse(startItem); } }, * priority); */ return loopBarrier; } public void recurse(final T item) { log.debug("Forking barrier=" + loopBarrier); loopBarrier.fork(); log.debug("Forked barrier=" + loopBarrier); EL.get().registerTimerCB(new CB0(name) { protected void cb(CBResult result) { cbRecursion.call(CBResult.OK(), item, new CB1<T>() { protected void cb(CBResult result, T item) { if (item != null) recurse(item); else { log.debug("Joining barrier" + loopBarrier); loopBarrier.join(); log.debug("Joined barrier" + loopBarrier); } } }); } }, priority); } } --- NEW FILE: Config.java --- package edu.harvard.syrah.sbon.async; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.*; import edu.harvard.syrah.prp.Log; public class Config { private static final Log log = new Log(Config.class); private static Properties configProps = System.getProperties(); public static Properties getConfigProps() { return configProps; } public static void setConfigProps(Properties props) { configProps = props; } public static String getProperty(String name, String defaultValue) { return configProps.getProperty(name, defaultValue); } public static String getProperty(String name) { return configProps.getProperty(name); } public static void read(String configRoot, String filename) { try { InputStream ip = new FileInputStream(filename); //log.info("config=" + filename); Properties configFileProps = new Properties(); configFileProps.load(ip); for (Object propertyObj : configFileProps.keySet()) { String property = (String) propertyObj; String value = configFileProps.getProperty(property); if (!Config.getConfigProps().containsKey(property) && value != null && value.length() != 0) { //log.debug("prop=" + property + " val=" + value); Config.getConfigProps().setProperty(property, value); } } SortedMap<String, String> allProps = new TreeMap<String, String>(); Properties cleanProps = new Properties(); for (Object propertyObj : Config.getConfigProps().keySet()) { //for (Iterator<Object> objIt = Config.getConfigProps().keySet().iterator(); objIt.hasNext();) { String property = (String) propertyObj; if (property.startsWith(configRoot)) { String shortName = property.substring(configRoot.length() + 1, property.length()); allProps.put(shortName, Config.getConfigProps().getProperty(property)); cleanProps.put(shortName, Config.getConfigProps().getProperty(property)); } } Config.configProps = cleanProps; log.info("configProperties=" + allProps.toString()); //log.debug("cleanProps=" + cleanProps.toString()); } catch (FileNotFoundException e1) { log.error("Could not open config file: " + e1); } catch (IOException e2) { log.error("Could not load config properties from file: " + e2); } } } --- NEW FILE: Sync.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jun 21, 2006 */ package edu.harvard.syrah.sbon.async; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; public class Sync { private static final Log log = new Log(Sync.class); private static int threadCounter = 0; /* * TODO convert this to use a thread pool */ public static void callBlocking(final CB0 cbBlocking, final CB0 cbDone) { final Thread blockingThread = new Thread("BlockingThread-" + (++threadCounter)) { @Override public void run() { super.run(); cbBlocking.callOK(); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cbDone.callOK(); } }); } }; blockingThread.run(); } } --- NEW FILE: EventLoopIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import edu.harvard.syrah.sbon.async.CallbacksIF.*; import edu.harvard.syrah.sbon.async.EL.Priority; /** * * This is the interface of the main asynchronous event loop for the node. * */ public interface EventLoopIF { /** * Main method that runs the event loop. It should be the last call in the program's main method. */ public void main(); /** * Force the exist from the main event loop. */ public void forceExit(); /** * Gracefully exit the main loop. This will only exit the main event loop if there are no pending * timers. */ public void exit(); public boolean shouldExit(); public CB0 registerTimerCB(CB0 cb); public CB0 registerTimerCB(CB0 cbEvent, Priority priority); /** * This methods allows an object to register a callback timer. * * @param delay * @param event */ public CB0 registerTimerCB(long delay, CB0 cbEvent); public CB0 registerTimerCB(long delay, CB0 cbEvent, Priority priority); /** * Register a new callback timer when the barrier permits it. * * @param barrier * @param cbEvent */ public void registerTimerCB(Barrier barrier, CB0 cbEvent); public void registerTimerCB(Barrier barrier, CB0 cbEvent, Priority priority); public void registerTimerCB(Barrier barrier, long delay, CB0 cbEvent); public void registerTimerCB(Barrier barrier, long delay, CB0 cbEvent, Priority priority); /** * Remove an existing timer callback from the event queue. * * @param CB0 */ public long deregisterTimerCB(CB0 cb); public long deregisterTimerCB(CB0 cb, Priority priority); /** * Registers the interest in communication events. Note that selectionKey is not a bit field, * which means that calls to this method are additive. * * @param channel * @param selectionKey * @param event */ public void registerCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException; /** * Allows an object to deregister a communication callback. * * @param channel * @param selectionKey * @throws ClosedChannelException */ public void deregisterCommCB(SelectableChannel channel, int selectionKey) throws ClosedChannelException; public void deregisterAllCommCBs(SelectableChannel channel) throws ClosedChannelException; public void setCommCB(SelectableChannel channel, int selectionKey, CB1R<Boolean, SelectionKey> commCB); public void unsetCommCB(SelectableChannel channel, int selectionKey); public CB1R<Boolean, SelectionKey> getCommCB(SelectableChannel channel, int selectionkey); public void unsetAllCommCBs(SelectableChannel channel); public boolean checkChannelState(SelectableChannel channel, int selectionKey); public void handleNetwork(); public void dumpState(boolean eventQueueDump); } --- NEW FILE: CallbacksIF.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 28, 2005 */ package edu.harvard.syrah.sbon.async; import java.util.LinkedList; import java.util.List; /** * * Asynchronous callback interface * */ public interface CallbacksIF { public abstract class AbstractCB implements Comparable { private long timeout = 0; private String name = null; protected boolean cancelled = false; private List<CB0> cancelledCBs = null; protected CB0 cbTimeout = null; protected long ts; public AbstractCB() { /* empty */ } public AbstractCB(long timeout) { setTimeout("CBTimeout", timeout); } public AbstractCB(String name) { this.name = name; } public abstract void callCBResult(CBResult result); protected boolean checkCancelled(CBResult result) { if (cbTimeout != null) { EL.get().deregisterTimerCB(cbTimeout); cbTimeout = null; } boolean returnResult = cancelled; if (result.state == CBResult.CBState.ERROR || result.state == CBResult.CBState.TIMEOUT) { cancelled = true; } return returnResult; } public void setTimeout(String name, final long timeout) { if (timeout == 0) return; this.timeout = timeout; // Register the timer for the timeout cbTimeout = EL.get().registerTimerCB(timeout, new CB0(name) { public void cb(CBResult result) { if (!cancelled) { // Call the timeout callback AbstractCB.this.callCBResult(CBResult.TIMEOUT(timeout + "ms")); AbstractCB.this.cancel(); } AbstractCB.this.cbTimeout = null; } }); } public void cancel() { cancelled = true; if (cbTimeout != null) { EL.get().deregisterTimerCB(cbTimeout); cbTimeout = null; } if (cancelledCBs != null) { for (CB0 cancelledCB : cancelledCBs) cancelledCB.callOK(); } } public void cancel(CB0 cb) { cancel(); cb.callOK(); } public boolean isCancelled() { return cancelled; } public void registerCancelledCB(CB0 cancelledCB) { if (cancelledCBs == null) cancelledCBs = new LinkedList<CB0>(); cancelledCBs.add(cancelledCB); } public void deregisterCancelledCB(CB0 cancelledCB) { cancelledCBs.remove(cancelledCB); } public void deregisterAllCancelledCBs() { cancelledCBs.clear(); } public boolean hasTimeout() { return cbTimeout != null; } public String toString() { String cbName = super.toString(); return (ts != 0 ? ts : "") + name == null ? cbName.substring(cbName.lastIndexOf('.') + 1) : name; } public long getTS() { return ts; } public int compareTo(Object obj) { assert obj instanceof AbstractCB; AbstractCB cmpCB = (AbstractCB) obj; // log.debug("this=" + this + " cmpEvent=" + cmpEvent); // Equality means that this is the same object if (this == cmpCB) { // log.debug("equal"); return 0; } /* * >= */ int comparison = (ts > cmpCB.ts) ? 1 : -1; // log.debug("comparison=" + comparison); return comparison; } } public abstract class CB0 extends AbstractCB { public CB0() { /* emtpty */ } public CB0(long timeout) { super(timeout); } public CB0(String name) { super(name); } protected abstract void cb(CBResult result); public void callCBResult(CBResult result) { if (!checkCancelled(result)) cb(result); } public void callOK() { if (!checkCancelled(CBResult.OK())) cb(CBResult.OK()); } public void callERROR() { if (!checkCancelled(CBResult.ERROR())) cb(CBResult.ERROR()); } public void call(CBResult result) { if (!checkCancelled(result)) cb(result); } } public abstract class CB1<T1> extends AbstractCB { public CB1() { /* emtpty */ } public CB1(String name) { super(name); } public CB1(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1); public void callCBResult(CBResult result) { cb(result, null); } public void call(CBResult result, T1 arg1) { if (!checkCancelled(result)) cb(result, arg1); } } public abstract class CB2<T1, T2> extends AbstractCB { public CB2() { /* emtpty */ } public CB2(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2); public void callCBResult(CBResult result) { cb(result, null, null); } public void call(CBResult result, T1 arg1, T2 arg2) { if (!checkCancelled(result)) cb(result, arg1, arg2); } } public abstract class CB3<T1, T2, T3> extends AbstractCB { public CB3() { /* emtpty */ } public CB3(long timeout) { super(timeout); } public CB3(String name) { super(name); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3); public void callCBResult(CBResult result) { cb(result, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3); } } public abstract class CB4<T1, T2, T3, T4> extends AbstractCB { public CB4() { /* emtpty */ } public CB4(long timeout) { super(timeout); } public CB4(String name) { super(name); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4); public void callCBResult(CBResult result) { cb(result, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4); } } public abstract class CB5<T1, T2, T3, T4, T5> extends AbstractCB { public CB5() { /* emtpty */ } public CB5(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5); public void callCBResult(CBResult result) { cb(result, null, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4, arg5); } } public abstract class CB6<T1, T2, T3, T4, T5, T6> extends AbstractCB { public CB6() { /* emtpty */ } public CB6(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6); public void callCBResult(CBResult result) { cb(result, null, null, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4, arg5, arg6); } } public abstract class CB7<T1, T2, T3, T4, T5, T6, T7> extends AbstractCB { public CB7() { /* emtpty */ } public CB7(long timeout) { super(timeout); } protected abstract void cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7); public void callCBResult(CBResult result) { cb(result, null, null, null, null, null, null, null); } public void call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7) { if (!checkCancelled(result)) cb(result, arg1, arg2, arg3, arg4, arg5, arg6, arg7); } } public abstract class CB0R<R> extends AbstractCB { public CB0R() { /* emtpty */ } public CB0R(long timeout) { super(timeout); } protected abstract R cb(CBResult result); public void callCBResult(CBResult result) { cb(result); } public R call(CBResult result) { if (!checkCancelled(result)) return cb(result); return null; } public void callOK() { if (!checkCancelled(CBResult.OK())) cb(CBResult.OK()); } } public abstract class CBR<R, T1> extends AbstractCB { public CBR() { /* emtpty */ } public CBR(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1); public void callCBResult(CBResult result) { cb(result, null); } public R call(CBResult result, T1 arg1) { if (!checkCancelled(result)) return cb(result, arg1); return null; } } public abstract class CB1R<R, T1> extends AbstractCB { public CB1R() { /* emtpty */ } public CB1R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1); public void callCBResult(CBResult result) { cb(result, null); } public R call(CBResult result, T1 arg1) { if (!checkCancelled(result)) return cb(result, arg1); return null; } } public abstract class CB2R<R, T1, T2> extends AbstractCB { public CB2R() { /* emtpty */ } public CB2R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1, T2 arg2); public void callCBResult(CBResult result) { cb(result, null, null); } public R call(CBResult result, T1 arg1, T2 arg2) { if (!checkCancelled(result)) return cb(result, arg1, arg2); return null; } } public abstract class CB3R<R, T1, T2, T3> extends AbstractCB { public CB3R() { /* emtpty */ } public CB3R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1, T2 arg2, T3 arg3); public void callCBResult(CBResult result) { cb(result, null, null, null); } public R call(CBResult result, T1 arg1, T2 arg2, T3 arg3) { if (!checkCancelled(result)) return cb(result, arg1, arg2, arg3); return null; } } public abstract class CB4R<R, T1, T2, T3, T4> extends AbstractCB { public CB4R() { /* emtpty */ } public CB4R(long timeout) { super(timeout); } protected abstract R cb(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4); public void callCBResult(CBResult result) { cb(result, null, null, null, null); } public R call(CBResult result, T1 arg1, T2 arg2, T3 arg3, T4 arg4) { if (!checkCancelled(result)) return cb(result, arg1, arg2, arg3, arg4); return null; } } } |
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/obj In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async/comm/obj Added Files: ObjCommRRCB.java ObjMessage.java ObjCommIF.java ObjMessageIF.java ObjComm.java ObjCommCB.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: ObjCommIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 5, 2005 */ package edu.harvard.syrah.sbon.async.comm.obj; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.comm.AddressIF; import edu.harvard.syrah.sbon.async.comm.TCPCommIF; /** * * API for the Communication layer using object serialisation * */ public interface ObjCommIF extends TCPCommIF { /** * Sends a new message to a destination address. This will reuse existing connections. * * @param message * @param destAddr */ public void sendMessage(ObjMessageIF message, AddressIF destAddr, CB0 cbSent); public void sendMessage(ObjMessageIF message, AddressIF destAddr, boolean bestEffort, CB0 cbSent); /** * Sends a request/reply message. * * @param message * @param destAddr * @param cbResponseMessage */ public void sendRequestMessage(ObjMessageIF message, AddressIF destAddr, ObjCommRRCB<? extends ObjMessageIF> cbResponseMessage); /** * Sends a request/reply message and also registers a handler for an error response. * * @param message * @param destAddr * @param cbResponseMessage * @param cbErrorMessage */ public void sendRequestMessage(ObjMessageIF message, AddressIF destAddr, ObjCommRRCB<? extends ObjMessageIF> cbResponseMessage, ObjCommRRCB<? extends ObjMessageIF> cbErrorMessage); public void sendResponseMessage(ObjMessageIF message, AddressIF destAddr, long requestMsgId, CB0 cbSent); public void sendErrorMessage(ObjMessageIF message, AddressIF destAddr, long requestMsgId, CB0 cbSent); /** * Enables objects to register their interest in incoming messages. The callback method will * be invoked if a message of class messageClass is received. * * @param messageClass * @param callback */ public void registerMessageCB(Class messageClass, ObjCommCB<? extends ObjMessageIF> cb); public void deregisterMessageCB(Class messageClass, ObjCommCB<? extends ObjMessageIF> cb); } --- NEW FILE: ObjCommCB.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async.comm.obj; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB4; import edu.harvard.syrah.sbon.async.comm.AddressIF; /** * * Callback API for the Comm module. * */ public abstract class ObjCommCB<T extends ObjMessageIF> extends CB4<T, AddressIF, Long, CB1<Boolean>> { public ObjCommCB() { super(); } public ObjCommCB(String name) { super(name); } public ObjCommCB(long timeout) { super(timeout); } } --- NEW FILE: ObjMessage.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 7, 2005 */ package edu.harvard.syrah.sbon.async.comm.obj; import edu.harvard.syrah.prp.Log; /** * * Abstract representation of a message * */ public abstract class ObjMessage implements ObjMessageIF { private static final Log log = new Log(ObjMessage.class); static final long serialVersionUID = 1000000001L; // Message id for the message private Long messageId = null; private boolean isResponse = false; private boolean isError = false; public boolean hasMsgId() { return messageId != null; } public long getMsgId() { if (messageId == null) log.error("Accessed messageId for a message that doesn't have one."); return messageId; } public void setMsgId(long messageId) { this.messageId = messageId; } public void setResponse(boolean isResponse) { this.isResponse = isResponse; } public boolean isResponse() { return isResponse; } public boolean isError() { return isError; } public void setError(boolean isError) { this.isError = isError; } } --- NEW FILE: ObjCommRRCB.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async.comm.obj; import edu.harvard.syrah.sbon.async.CallbacksIF.CB3; import edu.harvard.syrah.sbon.async.comm.AddressIF; /** * * Callback API for the Comm module. * */ public abstract class ObjCommRRCB<T extends ObjMessageIF> extends CB3<T, AddressIF, Long> { public ObjCommRRCB() { super(); } public ObjCommRRCB(String name) { super(name); } public ObjCommRRCB(long timeout) { super(timeout); } } --- NEW FILE: ObjComm.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 7, 2005 */ package edu.harvard.syrah.sbon.async.comm.obj; import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import edu.harvard.syrah.prp.*; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.Config; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.LoopIt; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0R; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.comm.*; /** * * Implementation of the Comm module with Java object serialisation * * TODO - Short-cut for messages sent to localhost * */ public class ObjComm extends TCPComm implements ObjCommIF { protected static final Log log = new Log(ObjComm.class); // Number of times to retry sending when buffer is full private static final int MAX_SEND_RETRIES = 3; // Retry a message send after this many ms private static final long SEND_RETRY_DELAY = 2500; // Timeout for all request/response calls (3min) private static final long RR_MSG_TIMEOUT = 180000; // Timeout for the CB that handles an RR ObjComm msg (30s) private static final long RR_CB_TIMEOUT = 30000; // Timeout for the CB that handles a regular ObjComm msg (60s) private static final long CB_TIMEOUT = 60 * 1000; private static final int MAX_OUTSTANDING_CBS = 3; // was: 10 private static final int KEEPALIVE_TIMEOUT = Integer.parseInt(Config.getConfigProps().getProperty( "sbon.connection.obj.timeout", "30000")); private static final boolean TRACE = Boolean.valueOf(Config.getConfigProps().getProperty( "sbon.trace", "false")); // Callback table for message classes protected Map<Class, List<ObjCommCB>> callbackTable = new HashMap<Class, List<ObjCommCB>>(); // Callback table for request/response with message ids protected Map<Long, ObjCommRRCB> requestResponseCBTable = new HashMap<Long, ObjCommRRCB>(); // Callback table for request/error with message ids protected Map<Long, ObjCommRRCB> requestErrorCBTable = new HashMap<Long, ObjCommRRCB>(); public ObjComm() { super(); } public void initServer(AddressIF bindAddress, CB0 cbInit) { super.initServer(bindAddress, cbInit); log.main("Binding to addr=" + bindAddress + " (localAddr=" + localNetAddress + ")"); } public void sendMessage(ObjMessageIF message, AddressIF destAddr, CB0 cbSent) { sendMessage(message, destAddr, false, cbSent); } /* * @see edu.harvard.syrah.sbon.communication.CommIF#sendMessage(edu.harvard.syrah.sbon.communication.MessageIF, * edu.harvard.syrah.sbon.communication.AddressIF) */ public void sendMessage(ObjMessageIF message, AddressIF destAddr, boolean bestEffort, final CB0 cbSent) { // log.debug("Sending msg=" + message); // log.debug(" destAddr=" + destAddr); assert destAddr instanceof NetAddress : "Can only send messages to NetAddresses."; if (getLocalAddress() == null) { log.error("Must set local address with initServer before sending messages."); } final NetAddress remoteAddr = (NetAddress) destAddr; if (!remoteAddr.isResolved()) { EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cbSent.call(CBResult.ERROR("Cannot send a message to an unresolved address=" + remoteAddr.toString(false))); } }); return; } if (!remoteAddr.hasPort()) { EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cbSent.call(CBResult.ERROR("Cannot send a message without a valid port to address=" + remoteAddr.toString(false))); } }); return; } byte[] msgArray = marshallMsg(message); // log.debug("destConnectionPool.keySet()=" + destConnectionPool.keySet()); Comm.WriteConnHandler writeConnHandler = destConnectionPool.get(remoteAddr); // Do we have an existing connection that is alive if (writeConnHandler == null || (!checkConnection(writeConnHandler))) { log.debug("No existing connection found."); Comm.ConnectConnHandler connectConnHandler = createConnection(destAddr, true, KEEPALIVE_TIMEOUT, new CB0() { protected void cb(CBResult result) { switch (result.state) { case OK: { // ignore break; } case TIMEOUT: case ERROR: { cbSent.call(result); break; } } } }); // Add the message to the queue ((ConnectConnHandler) connectConnHandler).addRequest(new ObjRequest(msgArray, bestEffort, cbSent)); } else { log.debug("Existing connection to destAddr=" + destAddr + " found."); // Add the message to this handler ((WriteConnHandler) writeConnHandler).addRequest(new ObjRequest(msgArray, bestEffort, cbSent)); } // log.debug("Handling the network..."); EL.get().handleNetwork(); // log.debug("Done handling the network."); } private byte[] marshallMsg(ObjMessageIF message) { // log.debug("message=" + message); byte[] byteArray = NetUtil.serializeObject(message); // log.debug("message2=" + Util.deserializeObject(byteArray)); return byteArray; } public void sendRequestMessage(ObjMessageIF message, AddressIF destAddr, ObjCommRRCB<? extends ObjMessageIF> cbResponseMessage) { sendRequestMessage(message, destAddr, cbResponseMessage, null); } public void sendRequestMessage(ObjMessageIF message, final AddressIF destAddr, final ObjCommRRCB<? extends ObjMessageIF> cbResponseMessage, ObjCommRRCB<? extends ObjMessageIF> cbErrorMessage) { if (!cbResponseMessage.hasTimeout()) { // Add a timeout cbResponseMessage.setTimeout("ObjCommRRTimeout", RR_MSG_TIMEOUT); } long messageId = PUtil.getRandomPosLong(); // log.debug("Sending request message=" + message); // log.debug(" id=" + messageId + " to destAddr=" + destAddr); message.setMsgId(messageId); message.setResponse(false); if (requestResponseCBTable.containsKey(messageId)) { log.error("Picked a key that already exists in the requestResponseCBTable. This is very unlikely and probably a bug"); } requestResponseCBTable.put(messageId, cbResponseMessage); requestErrorCBTable.put(messageId, cbErrorMessage); // log.debug("requestResponseCBTable=" + // POut.toString(requestResponseCBTable)); // log.debug("requestErrorCBTable=" + POut.toString(requestErrorCBTable)); sendMessage(message, destAddr, false, new CB0() { protected void cb(CBResult result) { // Did the sending of the message fail? switch (result.state) { case OK: { // Ignore this because we expect the response message back and there // is a timeout on this break; } case ERROR: case TIMEOUT: { log.warn("Request/reponse failed. destAddr=" + destAddr + " :" + result); cbResponseMessage.call(result, null, destAddr, null); break; } } } }); } public void sendResponseMessage(ObjMessageIF message, AddressIF destAddr, long requestMsgId, CB0 cbSent) { assert requestMsgId > 0 : "msg ids need to be positive. msgId=" + requestMsgId; message.setMsgId(requestMsgId); message.setResponse(true); // log.debug("Sending response msg=" + message); // log.debug(" id=" + requestMsgId + " to destAddr=" + destAddr); sendMessage(message, destAddr, false, cbSent); } public void sendErrorMessage(ObjMessageIF message, AddressIF destAddr, long requestMsgId, CB0 cbSent) { assert requestMsgId > 0 : "msg ids need to be positive. msgId=" + requestMsgId; message.setMsgId(requestMsgId); message.setError(true); message.setResponse(true); // log.debug("Sending error msg=" + message); // log.debug(" id=" + requestMsgId + " to destAddr=" + destAddr); sendMessage(message, destAddr, false, cbSent); } /* * @see edu.harvard.syrah.sbon.communication.CommIF#registerCallback(edu.harvard.syrah.sbon.communication.CommCBIF) */ public void registerMessageCB(Class messageClass, ObjCommCB<? extends ObjMessageIF> newCallback) { log.debug("Registering new callback for class=" + messageClass); List<ObjCommCB> callbacks = callbackTable.get(messageClass); if (callbacks == null) { callbacks = new LinkedList<ObjCommCB>(); callbackTable.put(messageClass, callbacks); } callbacks.add(newCallback); } public void deregisterMessageCB(Class messageClass, ObjCommCB<? extends ObjMessageIF> oldCallback) { log.debug("Deregistering callback for class=" + messageClass); List<ObjCommCB> callbacks = callbackTable.get(messageClass); callbacks.remove(oldCallback); } protected AcceptConnHandler getAcceptConnHandler(NetAddress localAddress, CB0 cbHandler) { return new AcceptConnHandler(localAddress, cbHandler); } protected ConnectConnHandler getConnectConnHandler(NetAddress remoteAddress, long timeout, CB0 cbHandler) { return new ConnectConnHandler(remoteAddress, timeout, cbHandler); } protected ReadConnHandler getReadConnHandler(SelectableChannel channel, NetAddress remoteAddr) { ReadConnHandler readConnHandler = (ReadConnHandler) EL.get().getCommCB(channel, SelectionKey.OP_READ); if (readConnHandler == null) { readConnHandler = new ReadConnHandler(channel, remoteAddr); } return readConnHandler; } protected WriteConnHandler getWriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { WriteConnHandler writeConnHandler = (WriteConnHandler) EL.get().getCommCB(channel, SelectionKey.OP_WRITE); if (writeConnHandler == null) { writeConnHandler = new WriteConnHandler(channel, remoteAddr); destConnectionPool.put(remoteAddr, writeConnHandler); } return writeConnHandler; } /* * Callback to accept a new connection */ class AcceptConnHandler extends TCPComm.AcceptConnHandler { AcceptConnHandler(AddressIF bindAddress, CB0 cbHandler) { super(bindAddress, cbHandler); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); if (key.isValid() && key.isAcceptable()) { ReadConnHandler readConnHandler = getReadConnHandler(channel, remoteAddr); readConnHandler.register(); /* * We can't allocate a writeConnHandler here yet because we don't know * the identity of the remote party */ } return true; } } /* * Callback for connecting a new connection */ class ConnectConnHandler extends TCPComm.ConnectConnHandler { private final Log log = new Log(ConnectConnHandler.class); private List<ObjRequest> objRequestList = new LinkedList<ObjRequest>(); ConnectConnHandler(NetAddress destAddr, long timeout, CB0 cbHandler) { super(destAddr, timeout, cbHandler); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); if (key.isValid() && key.isConnectable()) { WriteConnHandler writeConnHandler = null; if (destConnectionPool.keySet().contains(remoteAddr)) { log.debug("Already existing connection. Reusing."); writeConnHandler = (WriteConnHandler) destConnectionPool.get(remoteAddr); try { channel.close(); } catch (IOException e) { log.error("Could not close channel: " + e); } } else { log.debug("Allocating new read and write handlers"); Comm.ReadConnHandler readConnHandler = getReadConnHandler(channel, remoteAddr); readConnHandler.register(); writeConnHandler = getWriteConnHandler(channel, remoteAddr); log.debug("Adding hello msg to write buffer."); // Put own address into buffer for other party writeConnHandler.addHelloMsg(); } writeConnHandler.addRequests(objRequestList); objRequestList.clear(); writeConnHandler.register(); // Return connection callbacks cbHandler.call(CBResult.OK()); } return true; } protected void addRequest(ObjRequest objRequest) { objRequestList.add(objRequest); } public String toString() { return super.toString() + "/" + objRequestList.size(); } } /* * Callback to read data from an existing connection */ class ReadConnHandler extends TCPComm.ReadConnHandler { protected final Log log = new Log(ReadConnHandler.class); protected int outstandingCBs = 0; protected boolean resolvingAddr = false; private long timestamp; ReadConnHandler(SelectableChannel channel, NetAddress destAddr) { super(channel, destAddr); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); if (TRACE) timestamp = System.nanoTime(); if (key.isValid() && key.isReadable()) { /* * TODO: FIX ME this may be buggy since we might never hear back about * the last bytes read from a socket */ if (MAX_OUTSTANDING_CBS != 0 && outstandingCBs >= MAX_OUTSTANDING_CBS) { // log.warn("Too many outstanding callbacks (" + outstandingCBs + "). // Not reading more data. remoteAddr=" + remoteAddr); return false; } if (resolvingAddr) { // log.warn("Resolving address. Returning. remoteAddr=" + remoteAddr); return false; } channel = key.channel(); SocketChannel socketChannel = (SocketChannel) channel; // log.debug("channel=" + channel); //log.debug("destConnectionPool.keySet=" + destConnectionPool.keySet()); // Is there enough free buffer space to do another read? if (buffer.position() < MAX_BUFFER_SIZE) { log.debug("Reading. remoteAddr=" + remoteAddr + " buffer=" + buffer); if (buffer.remaining() == 0) { log.debug("Full read buffer for channel=" + channel + ". buffer=" + buffer + ". Extending..."); buffer = PUtil.extendByteBuffer(buffer, buffer.capacity() * 2); buffer.limit(buffer.capacity()); log.debug("new buffer=" + buffer); } int count = 0; try { count = socketChannel.read(buffer); //log.debug("Read count=" + count + " bytes"); } catch (IOException e) { log.warn("Error reading from socket: " + e + " remoteAddr=" + remoteAddr + " count=" + count + " key=" + key + " channel=" + channel); // + " // destCP=" + // POut.toString(destConnectionPool)); closeConnection(remoteAddr, socketChannel); return true; } if (count == 0) { log.warn("Odd. Received a selector key and only read 0 bytes. This is a bug?"); return true; } if (count == -1) { // Is this channel still open? log.debug("Connection to remoteAddr=" + remoteAddr + " has been closed."); log.debug(" channel=" + channel); if (remoteAddr != null) { WriteConnHandler writeConnHandler = (WriteConnHandler) destConnectionPool.get(remoteAddr); if (writeConnHandler != null && writeConnHandler.hasRequests()) { /* * TODO Fix this: move the requests to a new connection... */ log.warn("The connection had outstanding requests but was still closed."); } } closeConnection(remoteAddr, socketChannel); return true; } } else { // log.warn("Read buffer is full. Not reading more data. buffer=" + // buffer + " remoteAddr=" + remoteAddr); } // Try to receive a hello message receiveHello(new CB0() { protected void cb(CBResult result) { // log.debug("touching channel=" + channel); // Tell the timeout handler that this connection is in use // WriteConnHandler writeConnHandler = (WriteConnHandler) // EventLoop.get().getCommCB(channel, // SelectionKey.OP_WRITE); // Did we get enough bytes to read the length field? while ((remoteAddr != null) && (buffer.position() >= 4)) { buffer.flip(); int currentMsgSize = buffer.getInt(); if (currentMsgSize <= 0) { log.warn("Connection from remoteAddr=" + remoteAddr + " had msgSize=" + currentMsgSize + ". Closing."); closeConnection(remoteAddr, channel); return; } assert currentMsgSize > 0 : "currentMsgSize=" + currentMsgSize; //log.debug("before: buffer=" + buffer + " currentMsgSize=" + currentMsgSize); // Can we read a full message? if (buffer.limit() < currentMsgSize + 4) { // Prepare the buffer for more reading buffer.position(buffer.limit()); buffer.limit(buffer.capacity()); break; } byte[] msgArray = new byte[currentMsgSize]; buffer.get(msgArray); // Compact the buffer int limit = buffer.limit(); int pos = buffer.position(); buffer.compact(); buffer.limit(limit - pos); Object msgObject = unmarshallMsg(msgArray); if (msgObject == null) { log.warn("No valid msgObject received from remoteAddr=" + remoteAddr + ". Ignoring."); closeConnection(remoteAddr, channel); return; } assert msgObject instanceof ObjMessageIF : "Received a message that is not a ObjMessageIF."; // log.warn("msgObj=" + msgObject.toString()); ObjMessageIF msg = (ObjMessageIF) msgObject; performCallback(msg, timestamp); // Prepare the buffer for more reading buffer.position(buffer.limit()); buffer.limit(buffer.capacity()); } } }); } else { log.error("We received a readable callback for a key that is not readable. Bug?"); } return true; // log.debug("(after reading) buffer.remaining()=" + buffer.remaining()); } private void receiveHello(final CB0 cb) { if (remoteAddr == null && buffer.position() >= 8) { log.debug("Receiving hello msg."); buffer.flip(); byte[] addrByteArray = new byte[4]; buffer.get(addrByteArray); int port = buffer.getInt(); // log.debug("Resolving: " + NetUtil.byteIPAddrToString(addrByteArray)); resolvingAddr = true; AddressFactory.createResolved(addrByteArray, port, new CB1<AddressIF>() { protected void cb(CBResult result, AddressIF resolvedAddr) { resolvingAddr = false; remoteAddr = (NetAddress) resolvedAddr; log.debug("Received remoteAddr=" + remoteAddr); WriteConnHandler writeConnHandler = null; // Be careful because we might already know about a loopback // connection if (destConnectionPool.containsKey(remoteAddr) && !remoteAddr.equals(localNetAddress)) { // log.debug("connectionPool.keySet()=" + // POut.toString(channelPool.keySet())); log.debug("destConnectionPool.keySet()=" + POut.toString(destConnectionPool.keySet())); log.warn("Received another connection from remoteAddr=" + remoteAddr); // log.debug(" destConnectionSet=" + // POut.toString(destConnectionPool)); WriteConnHandler oldWriteConnHandler = (WriteConnHandler) destConnectionPool.get(remoteAddr); if (localNetAddress.getIntIPAddr() > remoteAddr.getIntIPAddr() && checkConnection(oldWriteConnHandler)) { log.warn("Rejecting new connection."); ReadConnHandler.this.deregister(); ReadConnHandler.this.destruct(); try { channel.close(); } catch (IOException e) { log.error("Could not close channel: " + e); } return; } log.warn("Closing old connection."); List<ObjRequest> objRequests = oldWriteConnHandler.getRequests(); closeConnection(remoteAddr, oldWriteConnHandler.getChannel()); writeConnHandler = getWriteConnHandler(channel, remoteAddr); if (!objRequests.isEmpty()) { log.debug("Old connection has outstanding requests"); writeConnHandler.addRequests(objRequests); writeConnHandler.register(); } } else { writeConnHandler = getWriteConnHandler(channel, remoteAddr); } int limit = buffer.limit(); buffer.compact(); buffer.position(limit - 8); cb.call(CBResult.OK()); } }); } else { cb.call(CBResult.OK()); } } @SuppressWarnings("unchecked") protected void performCallback(final ObjMessageIF msg, final long timestamp) { //log.debug("msg=" + msg); outstandingCBs++; // Is this a response message? if (!msg.isResponse()) { List<ObjCommCB> callbacks = callbackTable.get(msg.getClass()); if (callbacks == null) { log.warn("No callback registered for message class=" + msg.getClass() + ". Ignoring."); return; } // log.info("numCBs=" + callbacks.size() + " outstandingCBs=" + // outstandingCBs); LoopIt<ObjCommCB> cbLoop = new LoopIt<ObjCommCB>("ObjCommCBLoop", callbacks, new CB2<ObjCommCB, CB0R<Boolean>>() { protected void cb(CBResult result, final ObjCommCB objCommCB, final CB0R<Boolean> cbRecursion) { //log.debug("Performing cb for msg=" + msg); //log.debug(" remoteAddr=" + remoteAddr); //log.debug(" objCommCB=" + objCommCB); objCommCB.call(CBResult.OK(), msg, remoteAddr, timestamp, new CB1<Boolean>( CB_TIMEOUT) { protected void cb(CBResult result, Boolean handled) { switch (result.state) { case OK: { if (handled) { outstandingCBs--; // log.debug("Message handled by callback."); } else { log.warn("Message not handled by callback. Continuing..."); if (!cbRecursion.call(result)) { log.error("No valid callback found for msg=" + msg + " remoteAddr=" + remoteAddr); } } break; } case TIMEOUT: case ERROR: { log.error("Call for ObjComm msg failed: cb=" + objCommCB + " msg=" + msg + " remoteAddr=" + remoteAddr + " result=" + result); break; } } } }); } }); cbLoop.execute(); } else { long msgId = msg.getMsgId(); assert msgId > 0 : "msgId must be positive. msgId=" + msgId; /* * Is this a response message? */ if (!msg.isError()) { // Perform a callback for this response message. // log.debug("Performing cb for r/r msg=" + msg); // log.debug(" remoteAddr=" + remoteAddr + " msg.id=" + msgId); final ObjCommRRCB<ObjMessageIF> callback = requestResponseCBTable.remove(msgId); // log.debug("requestResponseCBTable=" + // POut.toString(requestResponseCBTable)); requestErrorCBTable.remove(msgId); if (callback == null) { log.error("No response callback registered for msg=" + msg + " with msgId=" + msgId + " from " + remoteAddr + ". Duplicate response msg? requestResponseCBTable=" + POut.toString(requestResponseCBTable)); } // log.debug("callback.hasTimeout=" + callback.hasTimeout()); EL.get().registerTimerCB(new CB0("ObjCommRRCB") { protected void cb(CBResult result) { if (!callback.isCancelled()) { outstandingCBs--; callback.call(CBResult.OK(), msg, remoteAddr, timestamp); } else { log.warn("Callback cb=" + callback + " already cancelled"); outstandingCBs--; } } }); } else { /* * Perform a callback for this error message. */ // log.debug("Performing callback for request/error msg=" + msg + " // from remoteAddr=" + remoteAddr); final ObjCommRRCB<ObjMessageIF> callback = requestErrorCBTable.remove(msgId); // log.debug("requestErrorCBTable=" + // POut.toString(requestResponseCBTable)); requestResponseCBTable.remove(msgId).cancel(); if (callback == null) { log.error("No error callback registered for msg=" + msg + " with msgId=" + msgId + " from " + remoteAddr + ". requestResponseCBTable=" + POut.toString(requestResponseCBTable)); } EL.get().registerTimerCB(new CB0("OBJCommRRCB") { protected void cb(CBResult result) { callback.call(CBResult.OK(), msg, remoteAddr, timestamp); outstandingCBs--; } }); } } } protected Object unmarshallMsg(byte[] msgArray) { Object msg = NetUtil.deserializeObject(msgArray); // log.debug("message=" + msg); return msg; } } /* * Callback to write data from an existing connection */ class WriteConnHandler extends TCPComm.WriteConnHandler { private final Log log = new Log(WriteConnHandler.class); private List<ObjRequest> objRequestList = new LinkedList<ObjRequest>(); WriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); if (key.isValid() && key.isWritable()) { while (handleNextRequest()) { /* empty */ } log.debug("Writing. remoteAddr=" + remoteAddr + ". buffer.position()=" + buffer.position()); SocketChannel socketChannel = (SocketChannel) channel; buffer.flip(); int count = 0; int writeCount = 0; //do { try { writeCount = socketChannel.write(buffer); count += writeCount; //log.debug("Written count=" + writeCount + " bytes. buffer=" + buffer); } catch (IOException e) { log.error("Error writing to socket: " + e + " remoteAddr=" + remoteAddr + " writeCount=" + count + " totalCount=" + count + " key=" + key + " channel=" + channel + " buffer=" + buffer + " destCP=" + POut.toString(destConnectionPool)); return true; } //} while (writeCount != 0 && buffer.remaining() > 0); int limit = buffer.limit(); // log.debug("limit=" + limit); buffer.compact(); buffer.position(limit - count); if (buffer.position() == 0 && !hasRequests()) { log.debug("Write buffer is empty."); deregister(); } else { log.debug("(after) buffer.position()=" + buffer.position()); } } else { log.error("We received a writable callback for a key that is not writable. Bug?"); } return true; } protected void addRequest(ObjRequest objRequest) { objRequestList.add(objRequest); //while (handleNextRequest()) { /* empty */ } // Register this write handler for comm callback register(); } protected void addRequests(List<ObjRequest> objRequests) { objRequestList.addAll(objRequests); //while (handleNextRequest()) { /* empty */ } // Register this write handler for comm callback register(); } protected boolean hasRequests() { return !objRequestList.isEmpty(); } protected List<ObjRequest> getRequests() { return objRequestList; } private boolean handleNextRequest() { //log.warn("handleNextRequest BEGIN l=" + POut.toString(objRequestList)); if (objRequestList.isEmpty()) { //log.warn("handlNextRequest END"); return false; } final ObjRequest objRequest = objRequestList.get(0); if (sendMessage(objRequest.msgArray, objRequest.bestEffort)) { // log.debug("Handling next obj request."); objRequestList.remove(0); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { objRequest.cbSent.call(CBResult.OK()); } }); //log.warn("handlNextRequest END"); return true; } //log.warn("handlNextRequest END"); return false; } protected boolean sendMessage(final byte[] msgArray, boolean bestEffort) { int totalMsgSize = msgArray.length + Integer.SIZE; //log.debug("totalMsgSize=" + totalMsgSize); if (buffer.capacity() < totalMsgSize) { log.warn("Send buffer not large enough to contain entire message with size=" + totalMsgSize + ". buffer=" + buffer + ". Extending..."); buffer = PUtil.extendByteBuffer(buffer, buffer.capacity() + (totalMsgSize - buffer.remaining())); buffer.limit(buffer.capacity()); //log.debug("new buffer=" + buffer); } if (buffer.remaining() < totalMsgSize) return false; /* * if (buffer.remaining() < totalMsgSize) { log.warn("Send buffer is full * when sending msg=" + msg + " to remoteAddr=" + remoteAddr + " * msgArray.size=" + msgArray.length + Integer.SIZE + " buffer=" + * buffer); * * if (bestEffort) { log.warn("Best effort: Dropping message."); return; } * else { log.warn("Retrying send in " + SEND_RETRY_DELAY + " ms..."); } * * EventLoop.get().registerTimerEvent(SEND_RETRY_DELAY, new EventCB() { * private int retryAttempt; * * @Override protected void cb(CBResult result, Event timerEvent) { if * (retryAttempt < MAX_SEND_RETRIES) { retryAttempt++; sendMessage(msg, * false, cbSent); } else { log.warn("Dropping message. Could not send * msg=" + msg + ". remoteAddr=" + remoteAddr); } } }); return; } */ // log.debug("(before) buffer.position()=" + buffer.position()); // log.debug("Added msg=" + msg); // log.debug(" writeConnhandler with " + msgArray.length + " bytes"); buffer.putInt(msgArray.length); buffer.put(msgArray); // log.debug("(after) buffer.position()=" + buffer.position()); /* * TODO This is not entirely correct because we should only confirm the * transmission of a message after it has been put on the wire... */ // Confirm the transmission of the message return true; } protected void addHelloMsg() { buffer.put(localNetAddress.getInetSocketAddress().getAddress().getAddress()); buffer.putInt(localNetAddress.getPort()); } public String toString() { return super.toString() + "/" + POut.toString(objRequestList); // ANSI.color(Color.LIGHTRED, // String.valueOf(objRequestList.size())); } } class ObjRequest { byte[] msgArray; boolean bestEffort; CB0 cbSent; ObjRequest(byte[] msgArray, boolean bestEffort, CB0 cbSent) { this.msgArray = msgArray; this.bestEffort = bestEffort; this.cbSent = cbSent; } public String toString() { return "ObjRequest: msg.length=" + msgArray.length; } } public static void main(String[] args) { log.main("Testing object communication"); ANSI.use(true); EL.set(new EL()); final ObjCommIF objComm = new ObjComm(); objComm.initServer(AddressFactory.createLocalhost(7777), new CB0() { protected void cb(CBResult result) { switch (result.state) { case OK: { final ObjMessageIF myMsg = new ObjMessage() { static final long serialVersionUID = 1000000001L; public int myfield; }; AddressFactory.createResolved("sb07.eecs.harvard.edu", 80, new CB1<AddressIF>() { protected void cb(CBResult result, AddressIF destAddr) { log.main("Sending message..."); objComm.sendRequestMessage(myMsg, destAddr, new ObjCommRRCB<ObjMessage>() { protected void cb(CBResult result, ObjMessage responseMessage, AddressIF remoteAddr, Long ts) { switch (result.state) { case OK: { log.main("Everything ok"); log.main("responseMessage=" + responseMessage); break; } case TIMEOUT: case ERROR: { log.main("Failed: " + result.state + " " + result.what); break; } } } }); } }); break; } case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } }); EL.get().exit(); EL.get().main(); } } --- NEW FILE: ObjMessageIF.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Jan 6, 2005 */ package edu.harvard.syrah.sbon.async.comm.obj; import java.io.Serializable; /** * * This interfaces represents a message that can be sent by the Comm layer. * */ public interface ObjMessageIF extends Serializable { public boolean hasMsgId(); public long getMsgId(); void setMsgId(long msgId); boolean isResponse(); void setResponse(boolean isReponse); boolean isError(); void setError(boolean isError); } |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:41
|
Update of /cvsroot/pyxida/AsyncJ/lib In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/lib Added Files: xmlrpc-2.0-a1-dev.jar Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: xmlrpc-2.0-a1-dev.jar --- (This appears to be a binary file; contents omitted.) |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:40
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/http In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async/comm/http Added Files: HTTPCallbackHandler.java HTTPComm.java HTTPCB.java HTTPCommIF.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: HTTPCallbackHandler.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jul 15, 2005 */ package edu.harvard.syrah.sbon.async.comm.http; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.CallbacksIF.CB4; import edu.harvard.syrah.sbon.async.comm.AddressIF; /** * Handles a HTTP request * * @params remoteAddress * @params URI * @params requestData * * @returns contentType * @returns reponseData * * @author peter * */ public abstract class HTTPCallbackHandler extends CB4<AddressIF, String, String, CB2<String, byte[]>> { /* empty */ } --- NEW FILE: HTTPComm.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jul 13, 2005 */ package edu.harvard.syrah.sbon.async.comm.http; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.nio.InvalidMarkException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.*; [...1181 lines suppressed...] case OK: { POut.p("resultCode=" + resultCode + " httpResponse=" + httpResponse + " httpData='" + httpData + "'"); break; } case TIMEOUT: case ERROR: { log.warn(result.toString()); break; } } } }); EL.get().exit(); EL.get().main(); } } --- NEW FILE: HTTPCB.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 11, 2005 */ package edu.harvard.syrah.sbon.async.comm.http; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.CallbacksIF.CB3; /** * */ public abstract class HTTPCB extends CB3<Integer, String, String> { public HTTPCB() { super(); } public HTTPCB(long timeout) { super(timeout); } protected abstract void cb(CBResult result, Integer resultCode, String requestResponse, String httpData); } --- NEW FILE: HTTPCommIF.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jul 13, 2005 */ package edu.harvard.syrah.sbon.async.comm.http; import edu.harvard.syrah.sbon.async.comm.TCPCommIF; public interface HTTPCommIF extends TCPCommIF { public void checkHTTP(String url, boolean keepAlive, HTTPCB cbHTTPResponse); public void sendHTTPRequest(String url, String httpRequest, boolean keepAlive, HTTPCB cbHTTPResponse); /* * This class sends a http request for a stream. It assumes that the stream uses chunked encoding... */ public void sendHTTPStreamRequest(String url, String httpRequest, HTTPCB cbHTTPResponse); public void registerHandler(String path, HTTPCallbackHandler httpRequestHandler); public void deregisterXMLRPCHandler(String path); } |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:40
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async/comm Added Files: CommIF.java AddressIF.java TCPCommIF.java NetAddress.java AddressFactory.java SimAddr.java UDPCommIF.java UDPCommCB.java UDPComm.java Comm.java TCPComm.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: TCPComm.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 9, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.io.IOException; import java.net.ServerSocket; import java.nio.channels.*; import java.util.HashMap; import java.util.Map; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.POut; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.Config; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; /** * Abstract class that is the base for all communication layer implementations. It handles the * creation and acceptance of connections but doesn't specify any strategy for reading and * writing. */ abstract public class TCPComm extends Comm implements TCPCommIF { protected static final Log log = new Log(TCPComm.class); protected static final int CONNECTION_RETRIES = Integer.valueOf(Config.getConfigProps().getProperty( "sbon.connection.retries", "3")); protected static final long STATE_DUMP_INTERVAL= Long.valueOf(Config.getConfigProps().getProperty( "sbon.comm.statedump", "0")); // Keep track of pending connections protected Map<NetAddress, Comm.ConnectConnHandler> pendingConnectionPool = new HashMap<NetAddress, Comm.ConnectConnHandler>(); // Destinations to which we maintain connections public Map<NetAddress, Comm.WriteConnHandler> destConnectionPool = new HashMap<NetAddress, Comm.WriteConnHandler>(); // Stores timeout handlers for outgoing connections protected Map<SelectableChannel, TimeoutHandler> timeoutPool = new HashMap<SelectableChannel, TimeoutHandler>(); protected TCPComm() { super(); if (STATE_DUMP_INTERVAL != 0) { EL.get().registerTimerCB(STATE_DUMP_INTERVAL, new CB0("TCPComm-StateDumper") { protected void cb(CBResult result) { log.main("Comm: " + TCPComm.this.getClass().getSimpleName()); log.main("destConnectionPool.size=" + destConnectionPool.size() + " destConnectionPool=" + POut.toString(destConnectionPool.keySet())); log.main("pendingConnectionPool.size=" + pendingConnectionPool.size() + " pendingConnectionPool=" + POut.toString(pendingConnectionPool.keySet())); log.main("timeoutPool.size=" + timeoutPool.size() + " timeoutPool=" + POut.toString(timeoutPool.keySet())); if (!EL.get().shouldExit()) EL.get().registerTimerCB(STATE_DUMP_INTERVAL, this); } }); } } public Comm.ConnectConnHandler createConnection(AddressIF destAddr, boolean reuseConnections, long timeout, CB0 cbHandler) { log.debug("Creating a new TCP connection to destAddr=" + destAddr); assert destAddr instanceof NetAddress; NetAddress remoteAddr = (NetAddress) destAddr; if (remoteAddr.isWildcard()) log.error("Cannot create a connection to wildcard addres. remoteAddr=" + remoteAddr); if (!remoteAddr.isResolved()) log.error("Cannot create a connection to an unresolved address. remoteAddr=" + remoteAddr); //log.debug("connectionPool.keySet()=" + Util.toString(connectionPool.keySet())); //log.debug("pendingConnectionPool.keySet()=" + POut.toString(pendingConnectionPool.keySet())); Comm.WriteConnHandler writeConnHandler = destConnectionPool.get(remoteAddr); // Do we have an existing connection? if (writeConnHandler == null || (!reuseConnections)) { log.debug("No Existing connection to destAddr=" + destAddr); Comm.ConnectConnHandler connectConnHandler = pendingConnectionPool.get(remoteAddr); if (connectConnHandler == null || (!reuseConnections)) { log.debug("Connecting with TCP to remoteAddr=" + remoteAddr); connectConnHandler = getConnectConnHandler(remoteAddr, timeout, cbHandler); pendingConnectionPool.put(remoteAddr, connectConnHandler); return connectConnHandler; } log.debug("Waiting for pending connection to destAddr=" + remoteAddr); return connectConnHandler; } log.error("Connection to destAddr=" + remoteAddr + " already exists."); return null; } public void closeConnection(AddressIF remoteAddr, SelectableChannel channel) { assert channel != null; Comm.WriteConnHandler writeConnHandler = null; if (remoteAddr != null) { writeConnHandler = destConnectionPool.remove(remoteAddr); } // Has the connection been already closed by us? if (writeConnHandler != null) { log.debug("Closing channel=" + channel); } if (timeoutPool.containsKey(channel)) timeoutPool.remove(channel).deregister(); super.closeConnection(channel); } public boolean checkConnection(Comm.WriteConnHandler writeConnHandler) { if (!writeConnHandler.channel.isOpen()) { log.warn("Checking connection and channel=" + writeConnHandler.channel + " has been closed"); closeConnection(writeConnHandler.remoteAddr, writeConnHandler.channel); return false; } return true; } protected TimeoutHandler createTimeoutHandler(SelectableChannel channel, AddressIF remoteAddr, long timeout) { return new TimeoutHandler(channel, remoteAddr, timeout); } /* * Callback to accept a new connection */ protected abstract class AcceptConnHandler extends Comm.AcceptConnHandler { private final Log log = new Log(AcceptConnHandler.class); protected AcceptConnHandler(AddressIF bindAddress, CB0 cbHandler) { super(cbHandler); assert bindAddress instanceof NetAddress; NetAddress bindNetAddress = (NetAddress) bindAddress; try { // Create the server socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(bindNetAddress.getInetSocketAddress()); serverSocketChannel.configureBlocking(false); this.channel = serverSocketChannel; this.setCommCB(); // Register a callback with the main event loop when a new connection is created this.register(); } catch (ClosedChannelException e1) { log.warn("Could not register interest with closed channel: " + e1); cbHandler.call(CBResult.ERROR(e1.toString())); return; } catch (IOException e2) { log.warn("Could not create server socket: " + e2); cbHandler.call(CBResult.ERROR(e2.toString())); return; } if (cbHandler != null) cbHandler.callOK(); } public Boolean cb(CBResult result, SelectionKey key) { if (key.isValid() && key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); try { channel = serverSocketChannel.accept(); channel.configureBlocking(false); log.debug("Accepting. channel=" + channel); } catch (IOException e) { log.warn("Could not accept new connection: " + e); } } else { log.error("We received an acceptable callback for a key that is not acceptable. Bug?"); } return true; } } /* * Callback for connecting a new connection */ protected abstract class ConnectConnHandler extends Comm.ConnectConnHandler { private final Log log = new Log(ConnectConnHandler.class); private int retryCounter = 1; private long timeout; protected ConnectConnHandler(NetAddress destAddr, long timeout, CB0 cbHandler) { super(destAddr, cbHandler); this.timeout = timeout; doConnect(); } public Boolean cb(CBResult result, SelectionKey key) { if (key.isValid() && key.isConnectable()) { channel = key.channel(); SocketChannel socketChannel = (SocketChannel) channel; log.debug("Finishing new connection to remoteAddr=" + remoteAddr); try { // Try to finish the connection socketChannel.finishConnect(); } catch (final IOException e) { log.debug("Connection failed for remoteAddr=" + remoteAddr + ": " + e + " retry=" + retryCounter); //bugWorkaround(); this.destruct(); if (retryCounter < CONNECTION_RETRIES) { retryCounter++; doConnect(); } else { log.debug("Failed trying to establish the connection to remoteAddr=" + remoteAddr); pendingConnectionPool.remove(remoteAddr); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult resultOK) { cbHandler.call(CBResult.ERROR("remoteAddr=" + remoteAddr.toString(false) + ": " + e.getMessage())); } }); } return true; } log.debug("Connected with TCP to remoteAddr=" + remoteAddr); pendingConnectionPool.remove(remoteAddr); if (timeout != 0) { createTimeoutHandler(channel, remoteAddr, timeout); } bugWorkaround(); this.destruct(); log.debug("Finished"); } else { log.error("Received a connectable CB for a key that is not connectable. Bug?"); } return true; } private void doConnect() { try { channel = SocketChannel.open(); this.setCommCB(); SocketChannel socketChannel = (SocketChannel) channel; socketChannel.configureBlocking(false); socketChannel.connect(remoteAddr.getInetSocketAddress()); } catch (IOException e) { log.warn("Could not connect to remoteAddr=" + remoteAddr + ": " + e); } register(); } private void bugWorkaround() { /* * Here we need to explicitly de-register the OP_CONNECT * interest, otherwise we will constantly be getting callbacks. * * This is BUG #4960791 in the Sun JDK bug database. */ deregister(); } } protected abstract class ReadConnHandler extends Comm.ReadConnHandler { protected ReadConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); } protected Boolean cb(CBResult result, SelectionKey key) { if (timeoutPool.containsKey(channel)) { timeoutPool.get(channel).touchConnection(); } return true; } } protected abstract class WriteConnHandler extends Comm.WriteConnHandler { protected WriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); } protected Boolean cb(CBResult result, SelectionKey key) { if (timeoutPool.containsKey(channel)) { timeoutPool.get(channel).touchConnection(); } return true; } } /** * * This handler runs periodically and closes connection that have not been used for at least * CONNECTION_TIMEOUT. * * TODO: There appears to be a bug where we eventually run out of file descriptors after a long time. * It may be that the code below is not working properly...? * */ public class TimeoutHandler extends CB0 { private final Log log = new Log(TimeoutHandler.class); protected SelectableChannel channel; protected AddressIF remoteAddr; private long timeout; private boolean touchedConnection = false; private CB0 cbTimer; public TimeoutHandler(SelectableChannel channel, AddressIF remoteAddr, long timeout) { log.debug("New timeouthandler: channel=" + channel); this.channel = channel; this.remoteAddr = remoteAddr; this.timeout = timeout; this.cbTimer = EL.get().registerTimerCB(timeout, this); timeoutPool.put(channel, this); } void touchConnection() { //log.debug("Touching connection: remoteAddr=" + remoteAddr); touchedConnection = true; } void deregister() { if (cbTimer != null) { log.debug("Deregistering the timeout handler"); EL.get().deregisterTimerCB(cbTimer); } } protected void timeoutConnection() { log.debug("Connection with remoteAddr=" + remoteAddr + " has expired. Closing."); closeConnection(remoteAddr, channel); } public void cb(CBResult result) { cbTimer = null; log.debug("destConnectionPool.keySet()=" + destConnectionPool.keySet()); if (!touchedConnection) { if (!EL.get().checkChannelState(channel, SelectionKey.OP_READ)) { timeoutConnection(); } else { log.debug("Cannot close. Read outstanding."); } return; } log.debug("Channel=" + channel + " is still active."); touchedConnection = false; cbTimer = EL.get().registerTimerCB(timeout, this); } public String toString() { return "TCPTimeout"; } } } --- NEW FILE: UDPCommCB.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 19, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.nio.ByteBuffer; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB4; /** * */ public abstract class UDPCommCB extends CB4<ByteBuffer, AddressIF, Long, CB1<Boolean>> { public UDPCommCB() { super(); } public UDPCommCB(long timeout) { super(timeout); } } --- NEW FILE: Comm.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 19, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.*; /** * * Parent class for all connection-oriented communication modules * */ abstract public class Comm implements CommIF { protected static final Log log = new Log(Comm.class); // Size of the buffer used for reading/writing from/to connections (32KB) protected static final int MIN_BUFFER_SIZE = 32 * 1024; // Maximum buffer size of 1 MB protected static final int MAX_BUFFER_SIZE = 1 * 1024 * 1024; // The local address public NetAddress localNetAddress; // The handler for accepting incoming connections protected AcceptConnHandler acceptConnHandler = null; public AddressIF getLocalAddress() { assert localNetAddress != null : "local addr is null"; return localNetAddress; } public int getLocalPort() { return localNetAddress.getPort(); } public void setLocalAddress(AddressIF localAddress) { if (!(localAddress instanceof NetAddress)) log.error("Can only understand NetAddresses."); this.localNetAddress = (NetAddress) localAddress; } /* * @see edu.harvard.syrah.sbon.communication.CommIF#init(edu.harvard.syrah.sbon.communication.AddressIF) */ public void initServer(AddressIF bindAddress, CB0 cbInit) { if (!(bindAddress instanceof NetAddress)) log.error("Can only understand NetAddresses."); log.debug("initServer with bindAddress=" + bindAddress); NetAddress bindNetAddress = (NetAddress) bindAddress; setLocalAddress(AddressFactory.createLocal(bindNetAddress.getPort())); // Don't pass in a net handler here -- no one's interested acceptConnHandler = getAcceptConnHandler(bindNetAddress, cbInit); } public void deinitServer() { closeConnection(acceptConnHandler.channel); } protected void closeConnection(SelectableChannel channel) { assert channel != null; try { EL.get().deregisterAllCommCBs(channel); } catch (ClosedChannelException e) { log.debug("Channel already closed"); } EL.get().unsetAllCommCBs(channel); try { channel.close(); } catch (IOException e) { log.debug("Could not close connection: " + e); } } protected abstract AcceptConnHandler getAcceptConnHandler(NetAddress localAddress, CB0 cbHandler); protected abstract ConnectConnHandler getConnectConnHandler(NetAddress remoteAddress, long timeout, CB0 cbHandler); abstract class ConnHandler extends CB1R<Boolean, SelectionKey> { public SelectableChannel channel = null; public NetAddress remoteAddr; protected ConnHandler(SelectableChannel channel, NetAddress remoteAddr) { this.channel = channel; this.remoteAddr = remoteAddr; } public String toString() { return ""; } } abstract protected class AcceptConnHandler extends ConnHandler { protected CB0 cbHandler; protected AcceptConnHandler(CB0 cbHandler) { super(null, null); this.cbHandler = cbHandler; } protected void register() { try { EL.get().registerCommCB(channel, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { log.error(e.toString()); } } protected void deregister() { try { EL.get().deregisterCommCB(channel, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { log.error(e.toString()); } } protected void setCommCB() { EL.get().setCommCB(channel, SelectionKey.OP_ACCEPT, this); } } abstract protected class ConnectConnHandler extends ConnHandler { protected CB0 cbHandler; protected ConnectConnHandler(NetAddress remoteAddr, CB0 cbHandler) { super(null, remoteAddr); this.cbHandler = cbHandler; } protected void register() { try { EL.get().registerCommCB(channel, SelectionKey.OP_CONNECT); } catch (ClosedChannelException e) { log.error(e.toString()); } } protected void deregister() { try { EL.get().deregisterCommCB(channel, SelectionKey.OP_CONNECT); } catch (ClosedChannelException e) { log.error(e.toString()); } } protected void destruct() { EL.get().unsetCommCB(channel, SelectionKey.OP_CONNECT); } protected void setCommCB() { EL.get().setCommCB(channel, SelectionKey.OP_CONNECT, this); } } /* * Callback to read data from an existing connection. Must be overidden. */ protected abstract class ReadConnHandler extends ConnHandler { private final Log log = new Log(ReadConnHandler.class); /* * TODO * There some problem with the buffer allocation here -- we're running out of direct * memeory. If this problem doesn't go away, these buffers must be replaced by * non-direct ones. */ public ByteBuffer buffer = ByteBuffer.allocate /* Direct */ (MIN_BUFFER_SIZE); protected ReadConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); log.debug("Allocating new ReadConnHandler for remoteAddr=" + remoteAddr); EL.get().setCommCB(channel, SelectionKey.OP_READ, this); } public boolean hasData() { return buffer.position() != 0; } public void register() { try { EL.get().registerCommCB(channel, SelectionKey.OP_READ); } catch (ClosedChannelException e) { log.error(e.toString()); } } public void deregister() { try { EL.get().deregisterCommCB(channel, SelectionKey.OP_READ); } catch (ClosedChannelException e) { log.error(e.toString()); } } public void destruct() { EL.get().unsetCommCB(channel, SelectionKey.OP_READ); } public String toString() { return String.valueOf(buffer.capacity()); } } /* * Callback to write data from an existing connection. Must be overidden. */ protected abstract class WriteConnHandler extends ConnHandler { private final Log log = new Log(WriteConnHandler.class); protected ByteBuffer buffer = ByteBuffer.allocate /* Direct */ (MIN_BUFFER_SIZE); protected WriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); log.debug("Allocating new WriteConnHandler for remoteAddr=" + remoteAddr); EL.get().setCommCB(channel, SelectionKey.OP_WRITE, this); } public void register() { try { EL.get().registerCommCB(channel, SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { log.warn(e.toString()); } } public void deregister() { assert buffer.position() == 0; try { EL.get().deregisterCommCB(channel, SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { log.warn(e.toString()); } } public SelectableChannel getChannel() { return channel; } public void setChannel(SelectableChannel channel) { this.channel = channel; } public String toString() { return String.valueOf(buffer.capacity()); } } } --- NEW FILE: TCPCommIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 9, 2005 */ package edu.harvard.syrah.sbon.async.comm; /** * * Interface for a generic communication module * */ public interface TCPCommIF extends CommIF { /* empty */ } --- NEW FILE: CommIF.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 19, 2005 */ package edu.harvard.syrah.sbon.async.comm; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; /** * */ public interface CommIF { /** * Initialises the comm module with a local address. * * @param localAddress */ public void initServer(AddressIF localAddress, CB0 cbInit); public void deinitServer(); /** * Returns the address that this instance of the comm module considers to be local * @return */ public AddressIF getLocalAddress(); public void setLocalAddress(AddressIF localAddress); public int getLocalPort(); } --- NEW FILE: UDPCommIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 19, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.nio.ByteBuffer; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; /** * * Interface for the UDPComm module that implements UDP packet communication * */ public interface UDPCommIF extends CommIF { public void initServer(AddressIF bindAddress, CB0 cbInit); public void sendPacket(ByteBuffer data, AddressIF destAddr); public void sendPacket(ByteBuffer data, AddressIF destAddr, UDPCommCB cb); public void registerPacketCallback(UDPCommCB cb); public void deregisterPacketCallback(UDPCommCB cb); } --- NEW FILE: SimAddr.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Mar 29, 2007 */ package edu.harvard.syrah.sbon.async.comm; import java.net.InetAddress; public class SimAddr implements AddressIF { private String name; public SimAddr(String name) { this.name = name; } public byte[] getByteIPAddrPort() { // TODO Auto-generated method stub return null; } public String getHostname() { return name; } public InetAddress getInetAddress() { return null; } public int getIntIPAddr() { return 0; } public int getPort() { return 0; } public boolean hasPort() { return false; } public boolean isResolved() { return false; } public String toString(boolean colour) { return name; } public int compareTo(Object o) { SimAddr cmpAddr = (SimAddr) o; return name.compareTo(cmpAddr.name); } @Override public String toString() { return toString(true); } @Override public int hashCode() { return name.hashCode(); } @Override public boolean equals(Object obj) { SimAddr cmpAddr = (SimAddr) obj; return name.equals(cmpAddr.name); } } --- NEW FILE: UDPComm.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 19, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.PortUnreachableException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.LinkedList; import java.util.List; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.ANSI.Color; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.Loop; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.CallbacksIF.CB2; import edu.harvard.syrah.sbon.async.EL.Priority; /** * * This communication module implements communication with UDP packets * */ public class UDPComm extends Comm implements UDPCommIF { private static final Log log = new Log(UDPComm.class, Color.LIGHTYELLOW); protected List<UDPCommCB> serverCallbacks = new LinkedList<UDPCommCB>(); protected Priority priority; private boolean keepNanoTS = false; public UDPComm(Priority priority) { this(priority, false); } public UDPComm(Priority priority, boolean keepNanoTS) { this.priority = priority; this.keepNanoTS = keepNanoTS; } /* * @see edu.harvard.syrah.sbon.comm.UDPCommIF#sendPacket(byte[], * edu.harvard.syrah.sbon.comm.AddressIF) */ public void sendPacket(ByteBuffer byteBuffer, AddressIF remoteAddr) { log.debug("Sending UDP packet for remoteAddr=" + remoteAddr); assert remoteAddr != null : "Remote address is null"; assert remoteAddr instanceof NetAddress : "Can only send packets to NetAddresses."; NetAddress remoteNetAddress = (NetAddress) remoteAddr; DatagramChannel datagramChannel = connect(remoteNetAddress); sendPacket(datagramChannel, byteBuffer); try { datagramChannel.close(); } catch (IOException e) { log.error("Couldn't close UDP channel: " + e); } } private void sendPacket(DatagramChannel datagramChannel, ByteBuffer byteBuffer) { byteBuffer.flip(); int toSent = byteBuffer.remaining(); try { int bytesSent = datagramChannel.write(byteBuffer); assert bytesSent == toSent; } catch (IOException e) { log.error("Could not send UDP packet:" + e); } // log.debug("Finished sending."); } private DatagramChannel connect(NetAddress remoteNetAddress) { DatagramChannel datagramChannel = null; try { datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); datagramChannel.connect(remoteNetAddress.getInetSocketAddress()); } catch (IOException e) { log.error("Could not create a new DatagramChannel: " + e); } log.debug("remoteAddr=" + remoteNetAddress + " datagramChannel=" + datagramChannel); return datagramChannel; } public void sendPacket(ByteBuffer data, AddressIF destAddr, final UDPCommCB cb) { assert destAddr instanceof NetAddress; NetAddress remoteNetAddress = (NetAddress) destAddr; DatagramChannel datagramChannel = connect(remoteNetAddress); log.debug("Sending UDP packet to remoteNetAddress=" + remoteNetAddress); final ReadConnHandler readConnHandler = createReadConnHandler(datagramChannel, remoteNetAddress); readConnHandler.udpCommCB = cb; cb.registerCancelledCB(new CB0() { protected void cb(CBResult result) { readConnHandler.deregister(); readConnHandler.destruct(); } }); readConnHandler.register(); sendPacket(datagramChannel, data); } /* * @see edu.harvard.syrah.sbon.comm.UDPCommIF#registerPacketCallback(edu.harvard.syrah.sbon.comm.UDPCommCB) */ public void registerPacketCallback(UDPCommCB cb) { assert (!serverCallbacks.contains(cb)); serverCallbacks.add(cb); } /* * @see edu.harvard.syrah.sbon.comm.UDPCommIF#deregisterPacketCallback(edu.harvard.syrah.sbon.comm.UDPCommCB) */ public void deregisterPacketCallback(UDPCommCB cb) { assert (serverCallbacks.contains(cb)); serverCallbacks.remove(cb); log.debug("Deregistered UDPCommCB=" + cb + " callbacks.size=" + serverCallbacks.size()); } protected AcceptConnHandler getAcceptConnHandler(NetAddress localAddress, CB0 cbHandler) { return new AcceptConnHandler(localAddress, cbHandler); } protected ConnectConnHandler getConnectConnHandler(NetAddress remoteAddress, long timeout, CB0 cbHandler) { return null; /* not used */ } protected ReadConnHandler createReadConnHandler(SelectableChannel channel, NetAddress remoteAddress) { return new ReadConnHandler(channel, remoteAddress); } protected WriteConnHandler createWriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { return null; /* not used */ } /* * Callback to listen for incoming UDP packets */ class AcceptConnHandler extends Comm.AcceptConnHandler { private final Log log = new Log(AcceptConnHandler.class, Color.LIGHTYELLOW); AcceptConnHandler(AddressIF bindAddress, CB0 cbHandler) { super(cbHandler); log.debug("Listening for UDP packets..."); assert (bindAddress instanceof NetAddress); NetAddress bindNetAddress = (NetAddress) bindAddress; try { DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); ReadConnHandler readConnHandler = createReadConnHandler(datagramChannel, bindNetAddress); readConnHandler.register(); // Create the datagram socket DatagramSocket datagramSocket = datagramChannel.socket(); datagramSocket.bind(bindNetAddress.getInetSocketAddress()); } catch (IOException e) { log.warn("Could not create server socket: " + e); cbHandler.call(CBResult.ERROR(e.toString())); } cbHandler.callOK(); } public Boolean cb(CBResult result, SelectionKey key) { return true; } } /* * Callback for an incoming packet */ class ReadConnHandler extends Comm.ReadConnHandler { protected final Log log = new Log(ReadConnHandler.class, Color.LIGHTYELLOW); protected UDPCommCB udpCommCB = null; ReadConnHandler(SelectableChannel channel, NetAddress destAddr) { super(channel, destAddr); } public Boolean cb(CBResult result, SelectionKey key) { if (key.isValid() && key.isReadable()) { log.debug("Reading from socket... buffer=" + buffer); // final PTimer pt = new PTimer(); channel = key.channel(); DatagramChannel datagramChannel = (DatagramChannel) channel; if (buffer.position() != 0) { log.warn("There is already data in the UDP buffer"); return false; } InetSocketAddress inetSocketAddress = null; try { inetSocketAddress = (InetSocketAddress) datagramChannel.receive(buffer); log.debug("Read count=" + buffer.position() + " bytes."); } catch (PortUnreachableException e) { log.debug("Error reading from socket. Ignoring. PortUnreachable: " + e); buffer.clear(); return true; } catch (IOException e) { log.warn("Error reading from socket. Ignoring: " + e); buffer.clear(); return true; } assert inetSocketAddress != null : "No packet found."; assert buffer.position() != 0 : "Odd. Received a selector key and only read 0 bytes. This is a bug?"; long newNanoTS = 0; if (keepNanoTS) newNanoTS = System.nanoTime(); final long nanoTS = newNanoTS; buffer.flip(); final ByteBuffer copyBuffer = ByteBuffer.allocate(buffer.remaining()); copyBuffer.put(buffer); copyBuffer.flip(); buffer.clear(); // pt.lap(log, "UDP (before adr res) took"); AddressFactory.createResolved(inetSocketAddress.getAddress().getAddress(), inetSocketAddress.getPort(), new CB1<AddressIF>("ResolveAddr1") { protected void cb(CBResult result, AddressIF resolvedRemoteAddr) { switch (result.state) { case OK: { // pt.lap(log, "UDP (after adr res) took"); remoteAddr = (NetAddress) resolvedRemoteAddr; log.debug("remoteAddr=" + remoteAddr + " callbacks.size=" + serverCallbacks.size()); if (udpCommCB != null) { if (channel.isOpen()) { udpCommCB.deregisterAllCancelledCBs(); deregister(); destruct(); try { channel.close(); } catch (IOException e) { log.error("Could not close the channel: " + e); } } // pt.lap(log, "UDPPacketHandling before cb timer reg // took"); EL.get().registerTimerCB( new CB0("UDPCommCB: remoteAddr=" + remoteAddr) { protected void cb(CBResult result) { // pt.stop(log, "UDPCommCB (before call) took"); udpCommCB.call(CBResult.OK(), copyBuffer, remoteAddr, nanoTS, new CB1<Boolean>() { protected void cb(CBResult result, Boolean handled) { switch (result.state) { case OK: { if (!handled) { log.warn("UDP packet from remoteAddr=" + remoteAddr + " was not handled by UDPCB."); } // pt.stop(log, "UDPCB took"); break; } case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } }); } }, priority); } else { // pt.lap(log, "UDPPacketHandling up to cb (remoteAddr=" + // remoteAddr + ") took"); UDPCommCB firstCB = serverCallbacks.get(0); Loop<UDPCommCB> cbLoop = new Loop<UDPCommCB>("UDPCBLoop", firstCB, new CB2<UDPCommCB, CB1<UDPCommCB>>() { protected void cb(CBResult result, final UDPCommCB udpCommCB, final CB1<UDPCommCB> cbRecursion) { // log.debug("UDPCommCB=" + udpCommCB); // log.debug("Trying udpCommCB.index=" + // serverCallbacks.indexOf(udpCommCB)); // pt.lap(log, "UDPCommCBs (before call) took"); udpCommCB.call(CBResult.OK(), copyBuffer, remoteAddr, nanoTS, new CB1<Boolean>() { protected void cb(CBResult result, Boolean handled) { switch (result.state) { case OK: { if (handled) { // log.debug("Packet handled by // callback."); // pt.stop(log, "UDPPacketHandling // (remoteAddr=" + remoteAddr + ") // took"); } else { copyBuffer.position(0); // log.debug("Packet not handled by // callback. Continuing..."); int cbIndex = serverCallbacks.indexOf(udpCommCB); // log.debug("Tried cbIndex=" + // cbIndex); if ((cbIndex + 1) >= serverCallbacks.size()) { log.error("No valid callback found for UDP packet. remoteAddr=" + remoteAddr + ". buffer=" + copyBuffer); } else { UDPCommCB nextCB = serverCallbacks.get(cbIndex + 1); cbRecursion.call(result, nextCB); } } break; } case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } }); } }, priority); cbLoop.execute(); } break; } case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } }); } return true; } } } --- NEW FILE: NetAddress.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 7, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.io.Serializable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import edu.harvard.syrah.prp.ANSI; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.NetUtil; import edu.harvard.syrah.prp.POut; import edu.harvard.syrah.prp.ANSI.Color; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; /** * * This is an Internet address and a port number as used by the Comm module. * * */ public class NetAddress implements AddressIF, Serializable { protected static final Log log = new Log(NetAddress.class); static final long serialVersionUID = 1000000001; private static final Color ADDR_COLOR = Color.CYAN; static final int UNKNOWN_PORT = -1; protected String hostname; protected byte[] byteIPAddr; protected int port; protected InetSocketAddress inetSocketAddress; protected InetAddress inetAddress; private boolean unresolvable = false; private boolean isWildcardAddress = false; NetAddress(String hostString, CB1<AddressIF> cbAddress) { this(hostString, null, UNKNOWN_PORT, cbAddress); } NetAddress(String hostString, int port, CB1<AddressIF> cbAddress) { this(hostString, null, port, cbAddress); } NetAddress(int port) throws UnknownHostException { this(InetAddress.getLocalHost().getCanonicalHostName(), InetAddress.getLocalHost().getAddress(), port, null); } NetAddress(final String hostString, byte[] byteIPAddr, final int port, final CB1<AddressIF> cbAddress) { log.debug("hostString=" + hostString + " byteIPAddr=" + POut.toString(byteIPAddr) + " port=" + port); this.byteIPAddr = byteIPAddr; this.port = port; if (hostString != null) parseHostString(hostString); if ((this.port <= 0) || (this.port > 65535)) { log.debug("Missing or invalid port: " + port + " with hostString=" + hostString); this.port = UNKNOWN_PORT; } // Is this an asynchronous call with DNS resolution? if (cbAddress != null) { resolve(new CB0() { protected void cb(CBResult result) { switch (result.state) { case OK: { try { if (NetAddress.this.port != UNKNOWN_PORT) { NetAddress.this.inetSocketAddress = new InetSocketAddress( InetAddress.getByAddress(NetAddress.this.byteIPAddr), NetAddress.this.port); } else { log.debug("Created address without port: hostname=" + hostname); NetAddress.this.inetAddress = InetAddress.getByAddress(hostname, NetAddress.this.byteIPAddr); } } catch (UnknownHostException e) { String ip = null; if (NetAddress.this.byteIPAddr != null) ip = NetUtil.byteIPAddrToString(NetAddress.this.byteIPAddr); log.error("Resolved address incorrectly. host=" + hostString + " ip=" + ip + " port=" + NetAddress.this.port + ": " + e); } break; } case TIMEOUT: case ERROR: { log.debug("Could not resolve address=" + hostname + " result=" + result); break; } } cbAddress.call(result, NetAddress.this); } }); } else { if (this.byteIPAddr == null) { log.debug("No IP address found."); this.unresolvable = true; return; } AddressFactory.getDNSComm().updateCache(hostname, this.byteIPAddr); try { log.debug("NetUtil.byteIPToIntIP(this.byteIPAddr)=" + NetUtil.byteIPToIntIP(this.byteIPAddr)); log.debug("byteIPAddr=" + POut.toString(this.byteIPAddr)); // Is this the wildcard address? if (NetUtil.byteIPToIntIP(this.byteIPAddr) != 0) { if (port != UNKNOWN_PORT) { this.inetSocketAddress = new InetSocketAddress(InetAddress.getByAddress(this.byteIPAddr), this.port); log.debug("inetSocketAddress=" + inetSocketAddress); } else { this.inetAddress = InetAddress.getByAddress(hostname, byteIPAddr); } } else { log.debug("Creating a server socket with a wildcard address"); this.isWildcardAddress = true; this.inetSocketAddress = new InetSocketAddress(this.port); } } catch (UnknownHostException e) { log.error("This should never happen: " + e); } } } private void parseHostString(String hostString) { // log.debug("Parsing hostString=" + hostString); String hostPart = null; if (hostString.contains(":")) { String[] splitString = hostString.split(":"); hostPart = splitString[0]; // Set the port number String portString = splitString[1]; this.port = Integer.valueOf(portString); } else { hostPart = hostString; } if (hostPart.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) { // This is an IP address this.byteIPAddr = NetUtil.stringIPToByteIP(hostPart); log.debug("Found an IP address=" + hostPart + " byteIPAddr=" + POut.toString(this.byteIPAddr)); // log.debug("String IP=" + NetUtil.byteIPAddrToString(this.byteIPAddr)); // Is this the local address? if (hostPart.equals("127.0.0.1")) { this.hostname = "localhost"; } } else { // This is a hostname this.hostname = hostPart; // Is this the local host? if (hostname.toLowerCase().equals("localhost")) { log.debug("Localhost address found"); this.byteIPAddr = NetUtil.stringIPToByteIP("127.0.0.1"); } } } private void resolve(final CB0 cb) { if (hostname == null && byteIPAddr != null) { // Do a reverse DNS lookup AddressFactory.getDNSComm().getNameByIPAddr(byteIPAddr, new CB1<String>() { protected void cb(CBResult result, String hostname) { switch (result.state) { case OK: { log.debug("Hostname resolved: " + NetUtil.byteIPAddrToString(byteIPAddr) + "->" + hostname); NetAddress.this.hostname = hostname; cb.call(result); break; } case TIMEOUT: case ERROR: { log.warn("Could not reverse resolve ip=" + NetUtil.byteIPAddrToString(byteIPAddr)); cb.call(CBResult.OK()); break; } } } }); } else if (hostname != null && byteIPAddr == null) { // Do a DNS lookup AddressFactory.getDNSComm().getIPAddrByName(hostname, new CB1<byte[]>() { protected void cb(CBResult result, byte[] byteIPAddr) { switch (result.state) { case OK: { log.debug("Address resolved: " + hostname + "->" + NetUtil.byteIPAddrToString(byteIPAddr)); NetAddress.this.byteIPAddr = byteIPAddr; cb.call(result); break; } case TIMEOUT: case ERROR: { log.warn("Could not resolve host=" + hostname); cb.call(result); break; } } } }); } else { // Nothing to do EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cb.call(CBResult.OK()); } }); } } public String getHostname() { if (hostname == null) return (byteIPAddr != null) ? NetUtil.byteIPAddrToString(byteIPAddr) : "?"; else return hostname; } public int getPort() { return port; } public boolean hasPort() { return port != UNKNOWN_PORT; } public byte[] getByteIPAddr() { return byteIPAddr; } public int getIntIPAddr() { return NetUtil.byteIPToIntIP(byteIPAddr); } public InetSocketAddress getInetSocketAddress() { assert port != UNKNOWN_PORT : "Tried to access a socket addr without a port number. hostname=" + hostname; assert inetSocketAddress != null : "Tried to access an address that could not be resolved. hostName=" + hostname; return inetSocketAddress; } public InetAddress getInetAddress() { assert inetSocketAddress != null || inetAddress != null; return inetSocketAddress != null ? inetSocketAddress.getAddress() : inetAddress; } public byte[] getByteIPAddrPort() { ByteBuffer bb = ByteBuffer.allocate(8); bb.put(byteIPAddr); bb.putInt(port); return bb.array(); } public boolean isWildcard() { return isWildcardAddress; } public boolean isResolved() { return (byteIPAddr != null); } /* * @see edu.harvard.syrah.sbon.communication.AddressIF#equals(edu.harvard.syrah.sbon.communication.AddressIF) */ public boolean equals(Object obj) { assert obj instanceof NetAddress : "Cannot compare NetAddress to non-NetAddress."; NetAddress cmpAddress = (NetAddress) obj; if (unresolvable) return hostname.equals(cmpAddress.hostname); return byteIPAddr[0] == cmpAddress.byteIPAddr[0] && byteIPAddr[1] == cmpAddress.byteIPAddr[1] && byteIPAddr[2] == cmpAddress.byteIPAddr[2] && byteIPAddr[3] == cmpAddress.byteIPAddr[3] && port == cmpAddress.port; } public int compareTo(Object obj) { assert obj instanceof NetAddress : "Cannot compare NetAddress to non-NetAddress."; NetAddress cmpAddress = (NetAddress) obj; if (unresolvable) return hostname.compareTo(cmpAddress.hostname); for (int i = 0; i < 4; i++) { if (byteIPAddr[i] < cmpAddress.byteIPAddr[i]) return -1; if (byteIPAddr[i] > cmpAddress.byteIPAddr[i]) return 1; } return 0; } public int hashCode() { if (unresolvable) return hostname.hashCode(); assert byteIPAddr != null : "byteIPAddr=null hostname=" + hostname; return byteIPAddr[0] + byteIPAddr[1] + byteIPAddr[2] + byteIPAddr[3] + port; } public String toString() { return toString(true); } public String toString(boolean colour) { String ipStr = (byteIPAddr != null) ? NetUtil.byteIPAddrToString(byteIPAddr) : "?"; // String outputHostname = (hostname == null) ? "/" + ipStr : hostname + "/" // + ipStr; String outputHostname = (hostname == null) ? "/" + ipStr : hostname; // + // "/" // + // ipStr; String addPort = (port != UNKNOWN_PORT) ? ":" + port : ""; if (colour) return ANSI.color(ADDR_COLOR, outputHostname + addPort); return outputHostname + addPort; } public static void main(String[] args) { EL.set(new EL(0, false)); ANSI.use(true); String[] oneAddr = args; /* String[] fewAddr = new String[] { "www.leo.org:80", "www.yahoo.com:80", "ypsilantigreens.blogspot.com:80", "iowageek.blogspot.com:80", "speedmerchant.blogspot.com:80", "www.cl.cam.ac.uk:80", "speedmerchant.blogspot.com:80", "speedmerchant.blogspot.com:80", "www.amazon.com:80", "asdsd.asdsadsa.asdsad.asdsa:80", "www.jambo.net:80", "blogs.codehaus.org:80", "greatgodpan.com:80", "www.nytimes.com:80", "www.cartoonchurch.com:80", "feeds.feedburner.com:80", "farneweblog.com:80", "larvalife.blogspot.com:80", "larvalife.blogspot.com:80", "blogs.sun.com:80", "www.rklau.com:80", "www.slashdot.org:80" }; String[] manyAddr = new String[] { "ypsilantigreens.blogspot.com:80", "speedmerchant.blogspot.com:80", "ingchingcrazycorner.blogspot.com:80", "moshiachblog.blogspot.com:80", "sleepdreaming.blogspot.com:80", "sigurdtosigurd.blogspot.com:80", "mygaymarriage.blogspot.com:80", "loualfeld.blogspot.com:80", "siewyee.blogspot.com:80", "lisasamson.blogspot.com:80", "theriverspeaks.blogspot.com:80", "globalocal.blogspot.com:80", "larvalife.blogspot.com:80", "mlmfuture.blogspot.com:80", "megaherzeleid.blogspot.com:80", "leftinthereign.blogspot.com:80", "bigfigure.blogspot.com:80", "imdebsnaps.blogspot.com:80", "louisvillemark.blogspot.com:80", "IowaGeek.blogspot.com:80", "hungryhyaena.blogspot.com:80", "soulsalliance.blogspot.com:80", "cornandpork.blogspot.com:80", "dmcc.blogspot.com:80", "doggieger.blogspot.com:80", "kindredpain.blogspot.com:80", "afreevietnam.blogspot.com:80", "bulldogbadger.blogspot.com:80", "kymc.blogspot.com:80", "thehermitess.blogspot.com:80", "timmyaffil.blogspot.com:80", "prabato.blogspot.com:80", "downingstreet.blogspot.com:80", "feveredrants.blogspot.com:80", "goonblog.blogspot.com:80", "wingsofeaglesmin.blogspot.com:80", "politicalquestion.blogspot.com:80", "padup.blogspot.com:80", "mindlessinottawa.blogspot.com:80", "chrth.blogspot.com:80", "goatlogic.blogspot.com:80", "blueyedtracy.blogspot.com:80", "circumlocute.blogspot.com:80", "japanpotato.blogspot.com:80", "injudiciousgardening.blogspot.com:80", "englfbatista.blogspot.com:80", "kendblog.blogspot.com:80", "deidreknight.blogspot.com:80", "islanduniverse2065.blogspot.com:80", "lankachildsupport.blogspot.com:80", "allianceofpower.blogspot.com:80", "tweakandbeat.blogspot.com:80", "ebookgroup.blogspot.com:80", "prumble.blogspot.com:80", "macgenie.blogspot.com:80", "welcomematt.blogspot.com:80", "knittingchick12.blogspot.com:80", "atlmmim.blogspot.com:80", "janestarr.blogspot.com:80", "ericsimonson.blogspot.com:80", "blacksheeppress.blogspot.com:80", "stirfriedheart.blogspot.com:80", "carinmincemoyer.blogspot.com:80", "attemptingescape.blogspot.com:80", "trondant.blogspot.com:80", "sanchar.blogspot.com:80", "neveragainmichel.blogspot.com:80", "rgsspecialists.blogspot.com:80", "iamshahid.blogspot.com:80", "unobserver.blogspot.com:80", "zzonkedd.blogspot.com:80", "jasonjenny.blogspot.com:80", "addaboy.blogspot.com:80", "tonym1203.blogspot.com:80", "vancouvercanuckshockey.blogspot.com:80", "crumblehall.blogspot.com:80", "blogofdisquietude.blogspot.com:80", "katemcdonald.blogspot.com:80", "sharedchanges.blogspot.com:80", "juliepede.blogspot.com:80", "asilentcacophony.blogspot.com:80", "mogageek.blogspot.com:80", "fireblazt.blogspot.com:80", "reflectingtheglory.blogspot.com:80", "rfseen.blogspot.com:80", "confrontingtheculture.blogspot.com:80", "lamontestamps.blogspot.com:80", "typenoevil.blogspot.com:80", "eccentricpat.blogspot.com:80", "the40s.blogspot.com:80", "justgina.blogspot.com:80", "windmillspinner.blogspot.com:80", "apancado.blogspot.com:80", "dyingtouse.blogspot.com:80", "greatebooks3.blogspot.com:80", "greatebooks4.blogspot.com:80", "cgamg.blogspot.com:80", "viciouspancake.blogspot.com:80", "79432.blogspot.com:80", "swirlymuffins.blogspot.com:80", "johnnyawesomo.blogspot.com:80", "scra.blogspot.com:80", "abusinan.blogspot.com:80", "mumpsimus.blogspot.com:80", "wwjd5.blogspot.com:80", "andyatkinskruger.blogspot.com:80", "wallywatch.blogspot.com:80", "furtherramblings.blogspot.com:80", "bemuseme.blogspot.com:80", "achipofftheoldblog.blogspot.com:80", "walrusomnibus.blogspot.com:80", "lawbot.blogspot.com:80", "stjacques.blogspot.com:80", "andrewvis.blogspot.com:80", "dissonantculture.blogspot.com:80", "kadnine.blogspot.com:80", "caipirinhasinenglish.blogspot.com:80", "truckerphilosophy.blogspot.com:80", "jreflect.blogspot.com:80", "bettysutility.blogspot.com:80", "anytownusa.blogspot.com:80", "geekymom.blogspot.com:80", "livetoad.blogspot.com:80", "myfearlessbounce.blogspot.com:80", "saddamhussein.blogspot.com:80", "forensicsandfaith.blogspot.com:80", "mysteryachievement.blogspot.com:80", "carpeimperiummundo.blogspot.com:80", "lizholmes.blogspot.com:80", "angryamerican.blogspot.com:80", "jaybradfield.blogspot.com:80", "desiyah.blogspot.com:80", "allupinhere.blogspot.com:80", "chiho103.blogspot.com:80", "disciplerefuge.blogspot.com:80", "coolpri.blogspot.com:80", "scholasticum.blogspot.com:80", "newyorkerintokyo.blogspot.com:80", "trustysteed.blogspot.com:80", "diaryofrobinkathleen.blogspot.com:80", "gaskinbalrog.blogspot.com:80", "xtines.blogspot.com:80", "sfchroniclebiz.blogspot.com:80", "haystacks37643.blogspot.com:80", "avirandomthots.blogspot.com:80", "ximebangalore.blogspot.com:80", "ronanj.blogspot.com:80", "sketchy1.blogspot.com:80", "drhanleydocs.blogspot.com:80", "jemappellesam.blogspot.com:80", "londonmark.blogspot.com:80", "loveyeshua.blogspot.com:80", "grecianed.blogspot.com:80", "somethingtolookforwardto.blogspot.com:80", "hawkesmorenomore.blogspot.com:80", "holycommunion.blogspot.com:80", "maskedmoviesnobs.blogspot.com:80", "politzero.blogspot.com:80", "jimbuck2.blogspot.com:80", "twiceaheretic.blogspot.com:80", "mvbarer.blogspot.com:80", "sandflies.blogspot.com:80", "csunmod.blogspot.com:80", "fallujapictures.blogspot.com:80", "vatsaview.blogspot.com:80", "schaferspoetry.blogspot.com:80", "asubmissivejourney.blogspot.com:80", "themainthing.blogspot.com:80", "ctenuchid.blogspot.com:80", "victro.blogspot.com:80", "thereportcard.blogspot.com:80", "littledivinities.blogspot.com:80", "rk77.blogspot.com:80", "ncsteve.blogspot.com:80", "kohoco.blogspot.com:80", "magicadvocate.blogspot.com:80", "thedevilsplaything.blogspot.com:80", "zubari.blogspot.com:80", "entiatalk.blogspot.com:80", "dennismccowantheviewfrommissouri.blogspot.com:80", "futenma2.blogspot.com:80", "netajimystery.blogspot.com:80", "mainepolitics.blogspot.com:80", "crawlingoffthealtar.blogspot.com:80", "meditativerose.blogspot.com:80", "worldmissions.blogspot.com:80", "ionicus.blogspot.com:80", "barisaxyvet.blogspot.com:80" }; */ final List<String> addrList = new ArrayList<String>(); for (String a : oneAddr) addrList.add(a); final int totalNum = addrList.size(); log.main("addrList.size=" + totalNum); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { AddressFactory.createResolved(addrList, new CB1<Map<String, AddressIF>>() { protected void cb(CBResult result, Map<String, AddressIF> addressMap) { switch (result.state) { case OK: { log.main("Got addressMap.keyset.size=" + addressMap.keySet().size()); for (String addrStr : addressMap.keySet()) { AddressIF addr = addressMap.get(addrStr); log.main("address=" + addrStr + " resolvedAddr=" + addr + " address.byteIP=" + NetUtil.byteIPAddrToString(((NetAddress) addr).getByteIPAddr())); } log.main("missing=" + (totalNum - addressMap.keySet().size())); break; } case ERROR: { log.error("result.what=" + result.what); break; } } } }); } }); /* EventLoop.get().registerTimerCB(100, new EventCB() { protected void cb(CBResult result, Event arg1) { EventLoop.get().exit(); } }); */ EL.get().main(); } } --- NEW FILE: AddressFactory.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 12, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.async.Barrier; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.comm.dns.DNSComm; import edu.harvard.syrah.sbon.async.comm.dns.DNSCommIF; /** * * Factory classes for addresses * */ public class AddressFactory { protected static final Log log = new Log(AddressFactory.class); private static DNSCommIF dnsComm; static { dnsComm = new DNSComm(); // DNSServer will dynamically listen to the right port dnsComm.initServer(new CB0() { protected void cb(CBResult result) { switch (result.state) { case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } }); } // Copy constructor public static AddressIF create(AddressIF addr) { if (!(addr instanceof NetAddress)) { log.error("Can only copy a NetAddress: " + addr); } NetAddress copyAddr = (NetAddress) addr; return new NetAddress(copyAddr.hostname, copyAddr.byteIPAddr, copyAddr.port, null); } public static AddressIF createUnresolved(String addressString) { return new NetAddress(addressString, null, NetAddress.UNKNOWN_PORT, null); } public static AddressIF createUnresolved(String hostname, int port) { return new NetAddress(hostname, null, port, null); } public static void createResolved(String hostname, int port, CB1<AddressIF> cbAddress) { new NetAddress(hostname, null, port, cbAddress); } public static void createResolved(String addressString, CB1<AddressIF> cbAddress) { new NetAddress(addressString, cbAddress); } public static void createResolved(List<String> addressStrings, int port, final CB1<Map<String, AddressIF>> cbAddressMap) { final Barrier lookupBarrier = new Barrier(true); final Map<String, AddressIF> addressMap = new HashMap<String, AddressIF>(); for (final String addressString : addressStrings) { lookupBarrier.fork(); new NetAddress(addressString, port, new CB1<AddressIF>() { protected void cb(CBResult result, AddressIF address) { switch (result.state) { case OK: { addressMap.put(addressString, address); // log.debug("addressMap.keySet.size()=" + // addressMap.keySet().size()); break; } case TIMEOUT: case ERROR: { log.warn("Could not resolve: address=" + address + " result=" + result); break; } } lookupBarrier.join(); } }); } EL.get().registerTimerCB(lookupBarrier, new CB0() { protected void cb(CBResult result) { cbAddressMap.call(CBResult.OK(), addressMap); } }); } public static void createResolved(String[] addressStrings, final CB1<Map<String, AddressIF>> cbAddressMap) { createResolved(Arrays.asList(addressStrings), cbAddressMap); } public static void createResolved(List<String> addressStrings, final CB1<Map<String, AddressIF>> cbAddressMap) { final Barrier lookupBarrier = new Barrier(true); final Map<String, AddressIF> addressMap = new HashMap<String, AddressIF>(); for (final String addressString : addressStrings) { lookupBarrier.fork(); new NetAddress(addressString, new CB1<AddressIF>() { protected void cb(CBResult result, AddressIF address) { switch (result.state) { case OK: { addressMap.put(addressString, address); // log.debug("addressMap.keySet.size()=" + // addressMap.keySet().size()); break; } case TIMEOUT: case ERROR: { log.warn("Could not resolve: address=" + address + " result=" + result); break; } } lookupBarrier.join(); } }); } EL.get().registerTimerCB(lookupBarrier, new CB0() { protected void cb(CBResult result) { cbAddressMap.call(CBResult.OK(), addressMap); } }); } public static void createResolved(byte[] byteIPAddr, int port, CB1<AddressIF> cbAddress) { new NetAddress(null, byteIPAddr, port, cbAddress); } public static void createResolved(SocketAddress socketAddress, CB1<AddressIF> cbAddress) { new NetAddress(null, ((InetSocketAddress) socketAddress).getAddress().getAddress(), ((InetSocketAddress) socketAddress).getPort(), cbAddress); } public static void createResolved(InetAddress inetAddress, int port, CB1<AddressIF> cbAddress) { new NetAddress(null, inetAddress.getAddress(), port, cbAddress); } public static void createResolved(byte[] byteIPAddrPort, CB1<AddressIF> cbAddress) { ByteBuffer bb = ByteBuffer.wrap(byteIPAddrPort); byte[] byteIPAddr = new byte[4]; bb.get(byteIPAddr); int port = bb.getInt(); new NetAddress(null, byteIPAddr, port, cbAddress); } public static AddressIF createLocalhost(int port) { return new NetAddress("localhost", new byte[] { 127, 0, 0, 1 }, port, null); } public static AddressIF createLocal(int port) { try { return new NetAddress(port); } catch (UnknownHostException e) { log.error("Local host unknown? " + e); } return null; } public static AddressIF createLocalAddress() { return createLocal(NetAddress.UNKNOWN_PORT); } public static AddressIF createServer(int port) { return new NetAddress(null, new byte[] { 0, 0, 0, 0 }, port, null); } public static AddressIF create(AddressIF address, int port) { NetAddress netAddress = (NetAddress) address; return new NetAddress(netAddress.getHostname(), netAddress.getByteIPAddr(), port, null); } static DNSCommIF getDNSComm() { if (dnsComm == null) log.error("No DNSComm service created. Cannot do lookups."); return dnsComm; } } --- NEW FILE: AddressIF.java --- /* * SBON * * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:41 $ * @since Jan 5, 2005 */ package edu.harvard.syrah.sbon.async.comm; import java.net.InetAddress; /** ... [truncated message content] |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:40
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/scriptroute In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async/comm/scriptroute Added Files: RemoteSRRequestMsg.java RemoteSRErrorMsg.java ScriptrouteCB.java RemoteSRResponseMsg.java ScriptrouteIF.java SRRequest.java Scriptroute.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: RemoteSRRequestMsg.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 20, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import edu.harvard.syrah.sbon.async.comm.AddressIF; import edu.harvard.syrah.sbon.async.comm.obj.ObjMessage; class RemoteSRRequestMsg extends ObjMessage { static final long serialVersionUID = 1000000001L; AddressIF nodeA; AddressIF nodeB; int numMeasurements; RemoteSRRequestMsg(AddressIF nodeA, AddressIF nodeB, int numMeasurements) { this.nodeA = nodeA; this.nodeB = nodeB; this.numMeasurements = numMeasurements; } public String toString() { return "RemoteSRRequestMsg: " + nodeA + "->" + nodeB + " numMeasurements=" + numMeasurements; } } --- NEW FILE: ScriptrouteIF.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 10, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.comm.AddressIF; /** * */ public interface ScriptrouteIF { public void getLatencies(AddressIF nodeA, AddressIF nodeB, int numMeasurements, CB1<double[]> cbLatencies); public void measureLatencies(AddressIF remoteNode, int numMeasurements, final ScriptrouteCB cbLatencies); } --- NEW FILE: ScriptrouteCB.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 19, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; public abstract class ScriptrouteCB extends CB1<double[]> { protected abstract void cb(CBResult result, double[] latencies); } --- NEW FILE: Scriptroute.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 10, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import edu.harvard.syrah.prp.ANSI; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.POut; import edu.harvard.syrah.prp.PUtil; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.comm.*; import edu.harvard.syrah.sbon.async.comm.obj.*; /** * */ public class Scriptroute extends TCPComm implements ScriptrouteIF { protected static final Log log = new Log(Scriptroute.class); private static final long LATENCY_MEASUREMENT_TIMEOUT = 30000; public static final int UNKNOWN_LATENCY = -1; private static final boolean DEFAULT_KEEP_ALIVE = true; // TODO add code that figures this out on the fly private static final String DEFAULT_SLICE_NAME = "harvard_sbon"; private static final int DEFAULT_SLICE_UID = 902; private static final int SR_PORT = 3356; private static final long CONN_TIMEOUT = 10000; private static Scriptroute singleInstance = null; protected ObjCommIF comm; protected String sliceName; protected int sliceUID; private AddressIF srAddress; private boolean keepAlive; public Scriptroute(ObjCommIF comm) { this.comm = comm; this.sliceName = DEFAULT_SLICE_NAME; this.sliceUID = DEFAULT_SLICE_UID; this.srAddress = AddressFactory.createLocalhost(SR_PORT); this.keepAlive = DEFAULT_KEEP_ALIVE; if (singleInstance == null) singleInstance = this; else log.error("Only a single instance of Scriptroute supported."); comm.registerMessageCB(RemoteSRRequestMsg.class, new RemoteSRRequestMsgHandler()); } public void getLatencies(final AddressIF nodeA, final AddressIF nodeB, int numMeasurements, final CB1<double[]> cbLatencies) { log.debug("getLatencies nodeA=" + nodeA + " nodeB=" + nodeB + " num=" + numMeasurements); ObjMessageIF msg = new RemoteSRRequestMsg(nodeA, nodeB, numMeasurements); comm.sendRequestMessage(msg, nodeA, new ObjCommRRCB<RemoteSRResponseMsg>(LATENCY_MEASUREMENT_TIMEOUT) { protected void cb(CBResult result, RemoteSRResponseMsg responseMsg, AddressIF remoteAddr, Long ts) { log.debug("Received: respMsg=" + responseMsg + " state=" + result + " remoteAddr=" + remoteAddr); //if (responseMsg == null || cbHandled == null) { // log.error("responseMsg=null"); //} switch (result.state) { case OK: { cbLatencies.call(result, responseMsg.latencies); break; } case TIMEOUT: case ERROR: { //log.warn ("its a timeout or error"); log.warn("Could not getLatency from nodeA=" + nodeA + "->nodeB=" + nodeB + ": " + result.what); cbLatencies.call(CBResult.ERROR("Could not getLatency from nodeA=" + nodeA + "->nodeB=" + nodeB + ": " + result.what), null); break; } } } }, new ObjCommRRCB<RemoteSRErrorMsg>() { protected void cb(CBResult result, RemoteSRErrorMsg errorMsg, AddressIF remoteAddr, Long ts) { log.debug("Received: " + errorMsg + " state=" + result + " for nodeA=" + nodeA + "->nodeB=" + nodeB); cbLatencies.call(CBResult.ERROR("Remote measurement to node=" + remoteAddr + " failed: " + errorMsg.result.what), null); } }); } public void measureLatencies(AddressIF remoteNode, int numMeasurements, final ScriptrouteCB cbLatencies) { SRRequest pingRequest = new SRPingRequest(remoteNode, keepAlive, numMeasurements, cbLatencies); WriteConnHandler writeConnHandler = (WriteConnHandler) destConnectionPool.get(srAddress); log.debug("writeConnHandler=" + writeConnHandler + " srAddr=" + srAddress); if (writeConnHandler == null || (!checkConnection(writeConnHandler))) { Comm.ConnectConnHandler connectConnHandler = super.createConnection(srAddress, keepAlive, CONN_TIMEOUT, new CB0() { protected void cb(CBResult result) { switch (result.state) { case OK: { /* empty */ break; } case TIMEOUT: case ERROR: { log.debug(result.toString()); cbLatencies.call(result, null); break; } } } }); if (connectConnHandler != null) { ((ConnectConnHandler) connectConnHandler).addRequest(pingRequest); } } else { writeConnHandler.addRequest(pingRequest); } //log.debug("pendingConnectionPool=" + Util.toString(pendingConnectionPool.keySet())); } @Override protected AcceptConnHandler getAcceptConnHandler(NetAddress localAddress, CB0 cbHandler) { return null; } @Override protected ConnectConnHandler getConnectConnHandler(NetAddress remoteAddress, long timeout, CB0 cbHandler) { return new ConnectConnHandler(remoteAddress, timeout, cbHandler); } @Override protected TimeoutHandler createTimeoutHandler(SelectableChannel channel, AddressIF remoteAddr, long timeout) { return new TimeoutHandler(channel, remoteAddr, timeout); } protected WriteConnHandler getWriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { WriteConnHandler writeConnHandler = (WriteConnHandler) EL.get().getCommCB(channel, SelectionKey.OP_WRITE); if (writeConnHandler == null) { writeConnHandler = new WriteConnHandler(channel, remoteAddr); destConnectionPool.put(remoteAddr, writeConnHandler); } return writeConnHandler; } protected ReadConnHandler getReadConnHandler(SelectableChannel channel, NetAddress remoteAddr) { ReadConnHandler readConnHandler = (ReadConnHandler) EL.get().getCommCB(channel, SelectionKey.OP_READ); if (readConnHandler == null) { readConnHandler = new ReadConnHandler(channel, remoteAddr); } return readConnHandler; } class ConnectConnHandler extends TCPComm.ConnectConnHandler { private final Log log = new Log(ConnectConnHandler.class); private List<SRRequest> requestList = new LinkedList<SRRequest>(); ConnectConnHandler(NetAddress destAddr, long timeout, CB0 handlerCB) { super(destAddr, timeout, handlerCB); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); if (key.isValid() && key.isConnectable()) { ReadConnHandler readResponseConnHandler = getReadConnHandler(channel, remoteAddr); WriteConnHandler writeConnHandler = getWriteConnHandler(channel, remoteAddr); for (Iterator<SRRequest> requestIt = requestList.iterator(); requestIt.hasNext();) { SRRequest request = requestIt.next(); writeConnHandler.addRequest(request); requestIt.remove(); } // Return connection callbacks cbHandler.call(CBResult.OK()); } return true; } protected void addRequest(SRRequest pingRequest) { requestList.add(pingRequest); } } class WriteConnHandler extends TCPComm.WriteConnHandler { private final Log log = new Log(WriteConnHandler.class); private List<SRRequest> requestList = new LinkedList<SRRequest>(); private SRRequest currentRequest = null; WriteConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); SocketChannel socketChannel = (SocketChannel) channel; if (key.isValid() && key.isWritable()) { if ((buffer.position() == 0)) { if (!handleNextRequest()) { deregister(); return true; } } buffer.flip(); int count = 0; try { count = socketChannel.write(buffer); } catch (IOException e) { log.warn("Error writing to socket: " + e); closeConnection(remoteAddr, socketChannel); return true; } log.debug("Writing to socket... count=" + count); int limit = buffer.limit(); // log.debug("limit=" + limit); buffer.compact(); buffer.position(limit - count); if (buffer.position() == 0) { log.debug("Write buffer is empty."); this.deregister(); } } else { log.error("We received a writable callback for a key that is not writable. Bug?"); } return true; } protected void addRequest(SRRequest request) { // Only register the handler if there are no requests if (requestList.isEmpty()) { this.register(); } requestList.add(request); } private boolean handleNextRequest() { if (currentRequest == null) { currentRequest = new SRHelloRequest(sliceName, sliceUID); } else { while (!requestList.isEmpty()) { currentRequest = requestList.get(0); if (currentRequest.isHandled()) { requestList.remove(currentRequest); } else break; } } if (currentRequest.isHandled() && requestList.isEmpty()) { if (!currentRequest.keepAlive()) { currentRequest = new SRByeRequest(); } else { return false; } } log.debug("Handling next request " + currentRequest); currentRequest.write(buffer); ReadConnHandler readConnHandler = getReadConnHandler(channel, remoteAddr); readConnHandler.currentRequest = currentRequest; readConnHandler.register(); return true; } public boolean hasMoreRequests() { return !requestList.isEmpty(); } } class ReadConnHandler extends TCPComm.ReadConnHandler { private final Log log = new Log(ReadConnHandler.class); // Size of the buffer used for reading from connections private static final int MAX_BUFFER_SIZE = 32768 * 4; protected SRRequest currentRequest = null; ReadConnHandler(SelectableChannel channel, NetAddress remoteAddr) { super(channel, remoteAddr); } public Boolean cb(CBResult result, SelectionKey key) { super.cb(result, key); if (key.isValid() && key.isReadable()) { //log.debug("Reading..."); channel = key.channel(); SocketChannel socketChannel = (SocketChannel) channel; if (buffer.remaining() == 0) { log.debug("The read buffer is full. buffer=" + buffer + ". Extending..."); buffer = PUtil.extendByteBuffer(buffer, buffer.capacity() * 2); buffer.limit(buffer.capacity()); log.debug("new buffer=" + buffer); } int count = 0; try { count = socketChannel.read(buffer); } catch (IOException e) { // The other party closed the connection -- handle this gracefully. log.debug("Error reading from socket: " + e); count = -1; } log.debug("Reading from socket... count=" + count + " buffer=" + buffer); if (count == -1 || count == 0) { /* * There is some confusion on what count == 0 means but we'll just treat it as EOF and close the connection. */ log.debug("Connection to remoteAddr=" + remoteAddr + " has been closed."); closeConnection(remoteAddr, socketChannel); return true; } buffer.flip(); boolean doneParsing = currentRequest.parse(buffer); if (doneParsing) { //buffer.position(buffer.limit()); //buffer.compact(); buffer.clear(); if (currentRequest instanceof SRByeRequest) { log.debug("Received peace out"); closeConnection(remoteAddr, socketChannel); return true; } if (channel.isOpen()) { WriteConnHandler writeConnHandler = (WriteConnHandler) EL.get().getCommCB(channel, SelectionKey.OP_WRITE); assert writeConnHandler != null; this.deregister(); writeConnHandler.register(); } else { log.debug("Channel has been closed"); } } else { buffer.position(buffer.limit()); buffer.limit(buffer.capacity()); } log.debug("before next read=" + buffer); } return true; } private void setRequest(SRRequest request) { this.currentRequest = request; } } class TimeoutHandler extends TCPComm.TimeoutHandler { private final Log log = new Log(TimeoutHandler.class); TimeoutHandler(SelectableChannel channel, AddressIF remoteAddr, long timeout) { super(channel, remoteAddr, timeout); } @Override protected void timeoutConnection() { WriteConnHandler writeConnHandler = getWriteConnHandler(channel, (NetAddress) remoteAddr); writeConnHandler.addRequest(new SRByeRequest()); } } class RemoteSRRequestMsgHandler extends ObjCommCB<RemoteSRRequestMsg> { protected void cb(CBResult result, final RemoteSRRequestMsg requestMsg, final AddressIF remoteAddr, Long ts, final CB1<Boolean> cbHandled) { log.debug ("Received: " + requestMsg); switch (result.state) { case OK: { cbHandled.call(result, true); final AddressIF nodeB = requestMsg.nodeB; int numMeasurements = requestMsg.numMeasurements; measureLatencies(nodeB, numMeasurements, new ScriptrouteCB() { protected void cb(CBResult result, double[] latencies) { switch (result.state) { case OK: { ObjMessageIF responseMsg = new RemoteSRResponseMsg(requestMsg.nodeA, nodeB, latencies); log.debug ("Sent SRResponse to " + remoteAddr); comm.sendResponseMessage(responseMsg, remoteAddr, requestMsg.getMsgId(), new CB0() { protected void cb(CBResult result) { /* empty */ } }); break; } case TIMEOUT: case ERROR: { log.debug(result.toString()); log.debug ("Sent SRErrorResponse to " + remoteAddr); ObjMessageIF errorMsg = new RemoteSRErrorMsg(result); comm.sendErrorMessage(errorMsg, remoteAddr, requestMsg.getMsgId(), new CB0() { protected void cb(CBResult result) { /* empty */ } }); break; } } } }); break; } case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } } public static void main(String[] args) { ANSI.use(true); EL.set(new EL()); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { ObjCommIF comm = new ObjComm(); final Scriptroute sr = new Scriptroute(comm); AddressFactory.createResolved("sb04.eecs.harvard.edu", new CB1<AddressIF>() { protected void cb(CBResult result, final AddressIF nodeA) { AddressFactory.createResolved("sb10.eecs.harvard.edu", new CB1<AddressIF>() { protected void cb(CBResult result, final AddressIF nodeB) { // sr.getLatency(nodeA, nodeB, 3, new CB<Double[]>() { // protected void cb(CBResult result, Double[] latencies) { // log.main("Measured latencies between nodeA=" + nodeA + " and nodeB=" + nodeB + " are " // + POut.toString(latencies)); // } // }); for (int i = 0; i < 50; i++) { sr.measureLatencies(nodeB, 3, new ScriptrouteCB() { protected void cb(CBResult result, double[] latencies) { switch (result.state) { case OK: { log.main("1. Measured latencies nodeA=" + nodeA + " --> nodeB=" + nodeB + " are " + POut.toString(latencies)); /* EventLoop.get().registerTimerCB(1000, new EventCB() { protected void cb(CBResult result, Event arg1) { sr.measureLatencies(nodeB, 3, new ScriptrouteCB() { protected void cb(CBResult result, double[] latencies) { log.main("2. Measured latencies nodeA=" + nodeA + " --> nodeB=" + nodeB + " are " + POut.toString(latencies)); } }); } }); */ break; } case TIMEOUT: case ERROR: { log.error(result.toString()); break; } } } }); } } }); } }); } }); EL.get().main(); } } --- NEW FILE: RemoteSRResponseMsg.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 20, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import edu.harvard.syrah.prp.POut; import edu.harvard.syrah.sbon.async.comm.AddressIF; import edu.harvard.syrah.sbon.async.comm.obj.ObjMessage; class RemoteSRResponseMsg extends ObjMessage { static final long serialVersionUID = 1000000001L; AddressIF nodeA; AddressIF nodeB; double[] latencies; RemoteSRResponseMsg(AddressIF nodeA, AddressIF nodeB, double[] latencies) { this.nodeA = nodeA; this.nodeB = nodeB; this.latencies = latencies; } public String toString() { return "RemoteSRResponseMsg: " + nodeA + "->" + nodeB + " lat=" + POut.toString(latencies); } } --- NEW FILE: SRRequest.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 19, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import java.nio.ByteBuffer; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.NetUtil; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.comm.AddressIF; /** * * This class encapsulates the request/response interaction with Scriptroute * */ abstract class SRRequest { private boolean keepAlive; protected boolean isHandled = false; protected ScriptrouteCB cb; protected StringBuffer request = new StringBuffer(); SRRequest(boolean keepAlive) { this(keepAlive, null); } SRRequest(boolean keepAlive, ScriptrouteCB cb) { this.keepAlive = keepAlive; this.cb = cb; } abstract void write(ByteBuffer buffer); abstract boolean parse(ByteBuffer buffer); boolean keepAlive() { return keepAlive; } boolean isHandled() { return isHandled; } ScriptrouteCB getCB() { return cb; } } class SRHelloRequest extends SRRequest { private static final Log log = new Log(SRHelloRequest.class); private static final byte[] INTERPRET = NetUtil.toHTTPBytes("interpret v0.3.9 "); private static final String PROCEED = "proceed"; private String sliceName; private int sliceUID; SRHelloRequest(String sliceName, int sliceUID) { super(true); this.sliceName = sliceName; this.sliceUID = sliceUID; } @Override void write(ByteBuffer buffer) { buffer.put(INTERPRET); buffer.put(NetUtil.toHTTPBytes(sliceName + "(" + sliceUID + ")")); buffer.put(NetUtil.toHTTPBytes("\n")); } @Override boolean parse(ByteBuffer buffer) { String response = new String(buffer.array(), buffer.position(), buffer.remaining()); response = response.trim(); log.debug("response='" + response + "'"); if (response.equals(PROCEED)) { return true; } return false; } public String toString() { return "HelloRequest: sliceName=" + sliceName + " sliceUID=" + sliceUID; } } class SRByeRequest extends SRRequest { private static final Log log = new Log(SRByeRequest.class); private static final byte[] THANKS = NetUtil.toHTTPBytes("thanks"); private static final String PEACE_OUT = "peace out"; SRByeRequest() { super(false); } @Override void write(ByteBuffer buffer) { buffer.put(THANKS); buffer.put(NetUtil.toHTTPBytes("\n")); } @Override boolean parse(ByteBuffer buffer) { String response = new String(buffer.array(), buffer.position(), buffer.remaining()); response = response.trim(); log.debug("response='" + response + "'"); if (response.equals(PEACE_OUT)) { return true; } return false; } public String toString() { return "ByteRequest"; } } class SRPingRequest extends SRRequest { protected static final Log log = new Log(SRPingRequest.class); private static final double DELAY = 0.0; private static final int ADDR_OFFSET = 16; private static final String SENDTRAIN = "sendtrain "; private static final String SENDTRAIN_CMD = "1/1 " + DELAY + " "; private AddressIF remoteAddr; private byte[] pingPacket = new byte[] { 69, 0, 0, 48, 0, 1, 0, 0, -1, 1, 0, 0, 0, 0, 0, 0, -116, -9, 60, 24, 8, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; protected double[] pingResults; private int measuredPings; SRPingRequest(AddressIF remoteAddr, boolean keepAlive, int numMeasurements, ScriptrouteCB cb) { super(keepAlive, cb); this.remoteAddr = remoteAddr; this.pingResults = new double[numMeasurements]; } void write(ByteBuffer buffer) { //sendtrain 1/1 0.000000 64 RQAA request = new StringBuffer(); request.append(SENDTRAIN); request.append(SENDTRAIN_CMD); byte[] destIP = remoteAddr.getByteIPAddrPort(); for (int i = 0; i <= 3; i++) pingPacket[ADDR_OFFSET + i] = destIP[i]; String encodedPacket = NetUtil.toBase64(pingPacket); log.debug("encoded='" + encodedPacket + "'"); request.append(encodedPacket.length()); request.append(" "); request.append(encodedPacket); log.debug("response='" + request + "'"); buffer.put(NetUtil.toHTTPBytes(request.toString())); buffer.put(NetUtil.toHTTPBytes("\n")); } private void addResult(double latency) { pingResults[measuredPings] = latency; measuredPings++; log.debug("Result: num=" + measuredPings + " lat=" + latency); } @Override boolean parse(ByteBuffer buffer) { double lastTS = 0; double lastLat = 0; String response = new String(buffer.array(), buffer.position(), buffer.remaining()); response = response.trim(); //log.debug("response='" + response + "'"); String[] responseLines = response.split("\n"); for (String responseLine : responseLines) { log.debug("responseLine='" + responseLine + "'"); if (responseLine.contains(">")) { String tsStr = responseLine.substring(0, responseLine.indexOf(" ")); //log.debug("tsStr1=" + tsStr); double readTS = Double.valueOf(tsStr); lastTS = readTS; } if (responseLine.contains("<")) { String tsStr = responseLine.substring(0, responseLine.indexOf(" ")); //log.debug("tsStr2=" + tsStr); double readTS = Double.valueOf(tsStr); assert lastTS != 0; lastLat = (readTS - lastTS) * 1000; lastTS = 0; } if (responseLine.contains("success")) { // Did we get a valid measurement? if (lastLat == 0) { log.warn("No ping response received from remoteAddr=" + remoteAddr); addResult(Scriptroute.UNKNOWN_LATENCY); } else { log.debug("lat=" + lastLat); addResult(lastLat); } if (measuredPings >= pingResults.length) { isHandled = true; log.debug("Executing application cb..."); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cb.call(CBResult.OK(), pingResults); log.debug("Cb has returned"); } }); } return true; } } return false; } public String toString() { return "PingRequest: measuredPings=" + measuredPings + " remoteAddr=" + remoteAddr; } } --- NEW FILE: RemoteSRErrorMsg.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Sep 20, 2005 */ package edu.harvard.syrah.sbon.async.comm.scriptroute; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.comm.obj.ObjMessage; class RemoteSRErrorMsg extends ObjMessage { static final long serialVersionUID = 1000000001L; CBResult result; RemoteSRErrorMsg(CBResult result) { this.result = result; } public String toString() { return "RemoteSRErrorMsg: result=" + result.toString(); } } |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:40
|
Update of /cvsroot/pyxida/AsyncJ In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526 Added Files: .project .classpath Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: .project --- <?xml version="1.0" encoding="UTF-8"?> <projectDescription> <name>AsyncJ</name> <comment></comment> <projects> </projects> <buildSpec> <buildCommand> <name>org.eclipse.jdt.core.javabuilder</name> <arguments> </arguments> </buildCommand> </buildSpec> <natures> <nature>org.eclipse.jdt.core.javanature</nature> </natures> </projectDescription> --- NEW FILE: .classpath --- <?xml version="1.0" encoding="UTF-8"?> <classpath> <classpathentry kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry combineaccessrules="false" kind="src" path="/Util-PRP"/> <classpathentry kind="lib" path="lib/xmlrpc-2.0-a1-dev.jar"/> <classpathentry kind="output" path="classes"/> </classpath> |
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/dns In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21526/src/edu/harvard/syrah/sbon/async/comm/dns Added Files: NSRR.java MGRR.java DNSQuery.java PTRRR.java HINFORR.java TXTRR.java RR.java DNSQuestion.java SOARR.java AAAARR.java DNSCommIF.java WKSRR.java ARR.java NULLRR.java MDRR.java DNSComm.java LICENSE.txt MRRR.java MBRR.java MINFORR.java MXRR.java CNAMERR.java MFRR.java Log Message: Initial commit of AsyncJ project needed for Pyxida --- NEW FILE: MBRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MB</code> RR record structure mapping. <pre> MB RDATA format (EXPERIMENTAL). +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / MADNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: MADNAME A <domain-name> which specifies a host which has the specified mailbox. MB records cause additional section processing which looks up an A type RRs corresponding to MADNAME. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MBRR extends CNAMERR { String aname; String cname; int recs = 1; int dlen; int sidx; public MBRR( byte[]pkt, int cidx ) { super(pkt, cidx); } public String getMAdName() { return getCName(); } } --- NEW FILE: ARR.java --- package edu.harvard.syrah.sbon.async.comm.dns; import edu.harvard.syrah.prp.Log; /** * This class provides the <code>A</code> RR record structure mapping. <pre> A RDATA format. +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | ADDRESS | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: ADDRESS A 32 bit Internet address. Hosts that have multiple Internet addresses will have multiple A records. A records cause no additional section processing. The RDATA section of an A line in a master file is an Internet address expressed as four decimal numbers separated by dots without any imbedded spaces (e.g., "10.2.0.52" or "192.0.5.6"). </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class ARR extends RR { private static final Log log = new Log(ARR.class); String host; String addr; byte[] byteAddr; int recs = 1; int dlen; public String toString() { return host+"\t\t"+super.toString()+"\t"+addr; } public ARR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_A ); } public String getHost() { return host; } public String getIPAddress() { return addr; } public byte[] getByteIPAddress() { return byteAddr; } public int getRecordCount() { return recs; } public int dataLength() { return dlen; } public void processRecord( int cidx ) { StringBuffer buf = new StringBuffer(); int nmlen = cidx; int fidx = cidx; cidx = RR.extractName( rdata, cidx, rdata.length, buf ); host = buf.toString(); nmlen = cidx - nmlen; cidx = setParameters( rdata, cidx ); int b1 = rdata[cidx++]; int b2 = rdata[cidx++]; int b3 = rdata[cidx++]; int b4 = rdata[cidx++]; addr =(b1 & 0xff) + "." + (b2 & 0xff) + "." + (b3 & 0xff) + "." + (b4 & 0xff); byteAddr = new byte[] {(byte) (b1 & 0xff), (byte) (b2 & 0xff), (byte) (b3 & 0xff), (byte) (b4 & 0xff)}; dlen = cidx - fidx; //log.debug("A dlen=" + dlen); } } --- NEW FILE: AAAARR.java --- package edu.harvard.syrah.sbon.async.comm.dns; import edu.harvard.syrah.prp.Log; /** * * This class provides the <code>AAAA</code> RR record structure mapping. * * * @author Peter Pietzuch - pr...@do... */ public class AAAARR extends RR { private static final Log log = new Log(AAAARR.class); String host; String addrv6; byte[] ipv6; int dlen; public String toString() { return host + "\t\t" + super.toString() + "\t" + ipv6; } public String getIPv6Address() { return addrv6; } public AAAARR(byte[] pkt, int cidx) { super(pkt, cidx, DNSQuestion.TYPE_AAAA); } public byte[] getIPv6() { return ipv6; } public int dataLength() { return dlen; } public void processRecord(int cidx) { StringBuffer buf = new StringBuffer(); int nmlen = cidx; int fidx = cidx; cidx = RR.extractName(rdata, cidx, rdata.length, buf); host = buf.toString(); nmlen = cidx - nmlen; cidx = setParameters(rdata, cidx); int b1 = rdata[cidx++]; int b2 = rdata[cidx++]; int b3 = rdata[cidx++]; int b4 = rdata[cidx++]; int b5 = rdata[cidx++]; int b6 = rdata[cidx++]; int b7 = rdata[cidx++]; int b8 = rdata[cidx++]; int b9 = rdata[cidx++]; int b10 = rdata[cidx++]; int b11 = rdata[cidx++]; int b12 = rdata[cidx++]; int b13 = rdata[cidx++]; int b14 = rdata[cidx++]; int b15 = rdata[cidx++]; int b16 = rdata[cidx++]; addrv6 = (b1 & 0xff) + "." + (b2 & 0xff) + "." + (b3 & 0xff) + "." + (b4 & 0xff) + (b5 & 0xff) + "." + (b6 & 0xff) + "." + (b7 & 0xff) + "." + (b8 & 0xff) + "." + (b9 & 0xff) + "." + (b10 & 0xff) + "." + (b11 & 0xff) + "." + (b12 & 0xff) + "." + (b13 & 0xff) + "." + (b14 & 0xff) + "." + (b14 & 0xff) + "." + (b16 & 0xff); ipv6 = new byte[] { (byte) (b1 & 0xff), (byte) (b2 & 0xff), (byte) (b3 & 0xff), (byte) (b4 & 0xff), (byte) (b5 & 0xff), (byte) (b6 & 0xff), (byte) (b7 & 0xff), (byte) (b8 & 0xff), (byte) (b9 & 0xff), (byte) (b10 & 0xff), (byte) (b11 & 0xff), (byte) (b12 & 0xff), (byte) (b13 & 0xff), (byte) (b14 & 0xff), (byte) (b15 & 0xff), (byte) (b16 & 0xff)}; dlen = cidx - fidx; //log.debug("AAAA dlen=" + dlen); } } --- NEW FILE: MDRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MD</code> RR record structure mapping. <pre> MD RDATA format (Obsolete). +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / MADNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: MADNAME A <domain-name> which specifies a host which has a mail agent for the domain which should be able to deliver mail for the domain. MD records cause additional section processing which looks up an A type record corresponding to MADNAME. MD is obsolete. See the definition of MX and [RFC-974] for details of the new scheme. The recommended policy for dealing with MD RRs found in a master file is to reject them, or to convert them to MX RRs with a preference of 0. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MDRR extends CNAMERR { String aname; String cname; int recs = 1; int dlen; int sidx; public MDRR( byte[]pkt, int cidx ) { super(pkt, cidx); } public String getMAdName() { return getCName(); } } --- NEW FILE: CNAMERR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>CNAME</code> RR record structure mapping. <pre> +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / CNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: CNAME A <domain-name> which specifies the canonical or primary name for the owner. The owner name is an alias. CNAME RRs cause no additional section processing, but name servers may choose to restart the query at the canonical name in certain cases. See the description of name server logic in [RFC-1034] for details. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class CNAMERR extends RR { String aname; String cname; int recs = 1; int dlen; int sidx; public String toString() { return aname+"\t\t"+super.toString()+"\t"+cname; } public CNAMERR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_CNAME ); } public String getHost( ) { return aname; } public String getCName( ) { return cname; } public int getRecordCount() { return recs; } public int dataLength() { return dlen; } public void processRecord( int cidx ) { StringBuffer buf = new StringBuffer(); sidx = cidx; int fcidx = cidx; cidx = RR.extractName( rdata, cidx, rdata.length, buf ); aname = buf.toString(); cidx = setParameters(rdata, cidx); buf.setLength(0); cidx = RR.extractName( rdata, cidx, rdata.length, buf ); dlen = (cidx - fcidx); cname = buf.toString(); } } --- NEW FILE: TXTRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>TXT</code> RR record structure mapping. <pre> TXT RDATA format. +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / TXT-DATA / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: TXT-DATA One or more <character-string>s. TXT RRs are used to hold descriptive text. The semantics of the text depends on the domain where it is found. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class TXTRR extends RR { protected String aname; protected String text; protected byte data[]; protected int dlen; protected int sidx; public String toString() { if( text == null ) text = new String(data); return aname+"\t\t"+super.toString()+"\t"+text; } public TXTRR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_TXT ); } public String getText( ) { if( text == null ) text = new String(data); return text; } public int dataLength() { return dlen; } public void processRecord( int cidx ) { StringBuffer buf = new StringBuffer(); sidx = cidx; int fcidx = cidx; cidx = RR.extractName( rdata, cidx, rdata.length, buf ); aname = buf.toString(); cidx = setParameters(rdata, cidx); buf.setLength(0); data = new byte[ rdlength ]; int cnt = 0; while( cnt < rdlength ) { int l = rdata[cnt+cidx] & 0xff; for( int i = 0; i < l; ++i ) buf.append( (char)rdata[cnt+cidx+1+i] ); buf.append(' '); cnt += l + 1; } cidx += rdlength; data = buf.toString().getBytes(); dlen = (cidx - fcidx); } } --- NEW FILE: LICENSE.txt --- Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --- NEW FILE: MFRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MF</code> RR record structure mapping. <pre> MF RDATA format (Obsolete). +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / MADNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: MADNAME A <domain-name> which specifies a host which has a mail agent for the domain which will accept mail for forwarding to the domain. MF records cause additional section processing which looks up an A type record corresponding to MADNAME. MF is obsolete. See the definition of MX and [RFC-974] for details ofw the new scheme. The recommended policy for dealing with MD RRs found in a master file is to reject them, or to convert them to MX RRs with a preference of 10. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MFRR extends CNAMERR { String aname; String cname; int recs = 1; int dlen; int sidx; public MFRR( byte[]pkt, int cidx ) { super(pkt, cidx); } public String getMAdName() { return getCName(); } } --- NEW FILE: DNSQuestion.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** * This class wraps the 3 pieces of information needed to execute a DNS query. * The string value of concern (a domain structured name) and then the type * and class of query. <p> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class DNSQuestion { protected byte data[]; protected int len; protected String name; protected int qtype; protected int qclass; /** The Internet class */ public static final int CLASS_IN = 1; /** The "A" record/query type. */ public static final int TYPE_A = 1; /** The "NS" record/query type. */ public static final int TYPE_NS = 2; /** The "MD" record/query type. */ public static final int TYPE_MD = 3; /** The "MF" record/query type. */ public static final int TYPE_MF = 4; /** The "CNAME" record/query type. */ public static final int TYPE_CNAME = 5; /** The "SOA" record/query type. */ public static final int TYPE_SOA = 6; /** The "MB" record/query type. */ public static final int TYPE_MB = 7; /** The "MG" record/query type. */ public static final int TYPE_MG = 8; /** The "MR" record/query type. */ public static final int TYPE_MR = 9; /** The "NULL" record/query type. */ public static final int TYPE_NULL = 10; /** The "WKS" record/query type. */ public static final int TYPE_WKS = 11; /** The "PTR" record/query type. */ public static final int TYPE_PTR = 12; /** The "HINFO" record/query type. */ public static final int TYPE_HINFO = 13; /** The "MINFO" record/query type. */ public static final int TYPE_MINFO = 14; /** The "MX" record/query type. */ public static final int TYPE_MX = 15; /** The "TXT" record/query type. */ public static final int TYPE_TXT = 16; /** The "AAAA" record/query type. */ public static final int TYPE_AAAA = 28; /** * Formats a time in seconds into a string describing the amount of * time indicated. * * e.g. 601 becomes "10m1s" for 10 minutes 1 second. */ public static String toTime( int ttl ) { String buf = ""; while( ttl > 0 ) { int v = 0; if( ttl < 60 ) { buf += ttl+"s"; ttl = 0; } else if( ttl < 3600 ) { v = ttl / (60); buf += v+"m"; ttl = ttl % 60; } else if( ttl < 3600 * 24 ) { v = ttl / 3600; buf += v+"h"; ttl = ttl % 3600; } else if( ttl < (3600 * 24 * 7) ) { v = ttl / (3600 * 24); buf += v+"d"; ttl = ttl % (3600 * 24); } else { v = ttl / (3600 * 24 * 7); buf += v+"w"; ttl = ttl % (3600*24 * 7); } } return buf; } /** Formats the question data to a descriptive string */ public String toString() { return toType(qtype)+"["+toClass(qclass)+"] for "+name; } /** * Returns a string describing the indicated CLASS_* value. * currently only IN is supported */ public static String toClass( int cls ) { String buf = ""+cls; if( cls == CLASS_IN ) buf = "IN"; return buf; } /** * Translates one of the TYPE_* values into a string * that is the part after the TYPE_ part of the variable * name. */ public static String toType( int type ) { return RR.toType( type ); } /** * Used to change the name to reuse this Question for another query. * * @param name the name string to ask a question about. */ public void setName( String name ) { this.name = name; } /** * Used to change the type to reuse this Question for another query. * @param type one of the TYPE_* values. */ public void setType( int type ) { this.qtype = type; } /** * Used to change the class to reuse this Question for another query. * @param cls one of the CLASS_* values */ public void setClass( int cls ) { this.qclass = cls; } /** * Constructs a new Question. The query data is formatted * from this constructor and placed into the <code>data</code> * buffer. * * @param name the domain string to query about * @param qtype one of the TYPE_* query types. * @param qclass one of the CLASS_* values. */ public DNSQuestion( String name, int qtype, int qclass ) { int nlen = RR.nameLength(name); this.name = name; this.qtype = qtype; this.qclass = qclass; data = new byte[nlen + 4]; RR.copyInName( data, 0, name ); data[ nlen ] = (byte)(qtype>>8); data[ nlen+1 ] = (byte)(qtype & 0xff); data[ nlen+2 ] = (byte)(qclass>>8); data[ nlen+3 ] = (byte)(qclass & 0xff); len = nlen+4; } /** * Returns the total length in bytes of the query data buffer */ public int length() { return len; } /** Returns the data buffer for the query */ public byte[] getData() { return data; } } --- NEW FILE: MXRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MX</code> RR record structure mapping. <pre> MX RDATA format. +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | PREFERENCE | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / EXCHANGE / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: PREFERENCE A 16 bit integer which specifies the preference given to this RR among others at the same owner. Lower values are preferred. EXCHANGE A <domain-name> which specifies a host willing to act as a mail exchange for the owner name. MX records cause type A additional section processing for the host specified by EXCHANGE. The use of MX RRs is explained in detail in [RFC-974]. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MXRR extends RR { int pref; String mxname; String mxfor; int dlen; int sidx; public String toString() { return mxfor+"\t\t"+super.toString()+"\t "+pref+" "+mxname; } public MXRR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_MX ); } public int getPreference() { return pref; } public String getExchanger() { return mxname; } public int dataLength() { return dlen; } public void processRecord( int cidx ) { StringBuffer buf = new StringBuffer(); sidx = cidx; int fcidx = cidx; cidx = RR.extractName( rdata, cidx, rdata.length, buf ); mxfor = buf.toString(); cidx = setParameters(rdata, cidx); pref = ((rdata[cidx]&0xff)<<8) | (rdata[cidx+1]&0xff); cidx += 2; // preference buf.setLength(0); cidx = RR.extractName( rdata, cidx, rdata.length, buf ); dlen = (cidx - fcidx); mxname = buf.toString(); } } --- NEW FILE: HINFORR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>HINFO</code> RR record structure mapping. <pre> HINFO RDATA format. +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / CPU / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / OS / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: CPU A <character-string> which specifies the CPU type. OS A <character-string> which specifies the operating system type. Standard values for CPU and OS can be found in [RFC 1010]. HINFO records are used to acquire general information about a host. The main use is for protocols such as FTP that can use special procedures when talking between machines or operating systems of the same type. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class HINFORR extends RR { String aname; String cpu; String os; int dlen; int sidx; public String toString() { return aname+"\t\t"+super.toString()+"\t"+cpu+" running "+os; } public HINFORR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_HINFO ); } public String getCPU( ) { return cpu; } public String getOS( ) { return os; } public int dataLength() { return dlen; } public void processRecord( int cidx ) { StringBuffer buf = new StringBuffer(); sidx = cidx; int fcidx = cidx; cidx = RR.extractName( rdata, cidx, rdata.length, buf ); aname = buf.toString(); cidx = setParameters(rdata, cidx); buf.setLength(0); int len = rdata[cidx++] & 0xff; for( int i = 0; i < len; ++i ) buf.append( (char)rdata[i+cidx] ); cidx += len; cpu = buf.toString(); buf.setLength(0); len = rdata[cidx++] & 0xff; for( int i = 0; i < len; ++i ) buf.append( (char)rdata[i+cidx] ); os = buf.toString(); cidx += len; dlen = (cidx - fcidx); } } --- NEW FILE: WKSRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; import java.io.IOException; import java.net.InetAddress; import java.util.Enumeration; import java.util.Vector; /** This class provides the <code>WKS</code> RR record structure mapping. <pre> WKS RDATA format. +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | 0 1 2 3 4 5 6 ADDRESS 10 11 12 13 14 15| +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | PROTOCOL | | +--+--+--+--+--+--+--+--+ | | | / <BIT MAP> / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: ADDRESS - An 32 bit ARPA Internet address PROTOCOL - An 8 bit IP protocol number <BIT MAP> - A variable length bit map. The bit map must be a multiple of 8 bits long. The WKS record is used to describe the well known services supported by a particular protocol on a particular internet address. The PROTOCOL field specifies an IP protocol number, and the bit map has one bit per port of the specified protocol. The first bit corresponds to port 0, the second to port 1, etc. If less than 256 bits are present, the remainder are assumed to be zero. The appropriate values for ports and protocols are specified in [13]. For example, if PROTOCOL=TCP (6), the 26th bit corresponds to TCP port 25 (SMTP). If this bit is set, a SMTP server should be listening on TCP port 25; if zero, SMTP service is not supported on the specified address. The anticipated use of WKS RRs is to provide availability information for servers for TCP and UDP. If a server supports both TCP and UDP, or has multiple Internet addresses, then multiple WKS RRs are used. WKS RRs cause no additional section processing. The RDATA section of a WKS record consists of a decimal protocol number followed by mnemonic identifiers which specify bits to be set to 1. </pre> <hr> Copyright (c) 2001-2005, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class WKSRR extends RR { String host; String addr; int recs = 1; int dlen; int proto; Vector<Integer> ports; public String toString() { return host+"\t\t"+super.toString()+"\t"+addr+", proto="+proto+", ports="+ports; } public WKSRR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_WKS ); } public String getHost() { return host; } public InetAddress getHostAddress() throws IOException { return InetAddress.getByName( addr ); } public int getRecordCount() { return recs; } public int dataLength() { return dlen; } public Enumeration getPortsList() { return ports.elements(); } public void processRecord( int cidx ) { ports = new Vector<Integer>(10); StringBuffer buf = new StringBuffer(); int nmlen = cidx; int fidx = cidx; cidx = RR.extractName( rdata, cidx, rdata.length, buf ); host = buf.toString(); nmlen = cidx - nmlen; cidx = setParameters( rdata, cidx ); int b1 = rdata[cidx++]; int b2 = rdata[cidx++]; int b3 = rdata[cidx++]; int b4 = rdata[cidx++]; addr =(b1&0xff)+"."+ (b2&0xff)+"."+ (b3&0xff)+"."+ (b4&0xff); proto = rdata[cidx++] & 0xff; int cnt = rdlength - 5; for( int i = 0; i < cnt; ++i ) { int v = rdata[i + cidx] & 0xff; // System.out.println("data["+(i+cidx)+"]: "+v ); for( int j = 7; j >= 0; --j ) { int bit = (i * 8) + (7-j); // System.out.println( "bit no: "+bit+", set? "+(v & (1<<j))); if( (v & (1<<j)) != 0 ) { ports.addElement( new Integer( bit ) ); } } } dlen = (cidx - fidx) + cnt; } } --- NEW FILE: MGRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MG</code> RR record structure mapping. <pre> MG RDATA format (EXPERIMENTAL). +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / MGMNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: MGMNAME A <domain-name> which specifies a mailbox which is a member of the mail group specified by the domain name. MG records cause no additional section processing. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MGRR extends CNAMERR { String emailbx; public MGRR( byte[]pkt, int cidx ) { super(pkt, cidx); } public String getRMailBX() { return getCName(); } public String getEMailBX() { return emailbx; } public void processRecord( int cidx ) { int fcidx = cidx; super.processRecord( cidx ); cidx += dlen; // StringBuffer buf = new StringBuffer(); // cidx = RR.extractName( rdata, cidx, rdata.length, buf ); // emailbx = buf.toString(); dlen = (cidx - fcidx); } } --- NEW FILE: SOARR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>SOA</code> RR record structure mapping. <pre> SOA RDATA format. +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / MNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / RNAME / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | SERIAL | | | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | REFRESH | | | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | RETRY | | | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | EXPIRE | | | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ | MINIMUM | | | +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: MNAME The <domain-name> of the name server that was the original or primary source of data for this zone. RNAME A <domain-name> which specifies the mailbox of the person responsible for this zone. SERIAL The unsigned 32 bit version number of the original copy of the zone. Zone transfers preserve this value. This value wraps and should be compared using sequence space arithmetic. REFRESH A 32 bit time interval before the zone should be refreshed. RETRY A 32 bit time interval that should elapse before a failed refresh should be retried. EXPIRE A 32 bit time value that specifies the upper limit on the time interval that can elapse before the zone is no longer authoritative. MINIMUM The unsigned 32 bit minimum TTL field that should be exported with any RR from this zone. SOA records cause no additional section processing. All times are in units of seconds. Most of these fields are pertinent only for name server maintenance operations. However, MINIMUM is used in all query operations that retrieve RRs from a zone. Whenever a RR is sent in a response to a query, the TTL field is set to the maximum of the TTL field from the RR and the MINIMUM field in the appropriate SOA. Thus MINIMUM is a lower bound on the TTL field for all RRs in a zone. Note that this use of MINIMUM should occur when the RRs are copied into the response and not when the zone is loaded from a master file or via a zone transfer. The reason for this provison is to allow future dynamic update facilities to change the SOA RR with known semantics. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class SOARR extends RR { String rname; String mname; String dname; int recs = 1; int dlen; int serial; int refresh; int retry; int expire; int minimum; public String toString() { return dname+"\t\t"+super.toString()+"\t"+mname+", "+rname+"\n"+ " SERIAL "+serial+"\n"+ " REFRESH "+DNSQuestion.toTime(refresh)+"\n"+ " RETRY "+DNSQuestion.toTime(retry)+"\n"+ " EXPIRE "+DNSQuestion.toTime(expire)+"\n"+ " MINIMUM "+DNSQuestion.toTime(minimum) ; } public SOARR( byte[]pkt, int cidx ) { super(pkt, cidx, DNSQuestion.TYPE_SOA); } public String getNSHost() { return mname; } public String getResponsibleEmail() { return rname; } public int getRecordCount() { return recs; } public int dataLength() { return dlen; } public void processRecord( int cidx ) { StringBuffer buf = new StringBuffer(); int fidx = cidx; cidx = extractName( rdata, cidx, rdata.length, buf ); dname = buf.toString(); cidx = setParameters( rdata, cidx ); buf.setLength(0); cidx = extractName( rdata, cidx, rdata.length, buf ); mname = buf.toString(); buf.setLength(0); cidx = RR.extractName( rdata, cidx, rdata.length, buf ); rname = buf.toString(); serial = ((rdata[cidx]&0xff) << 24) | ((rdata[cidx+1] & 0xff)<<16) | ((rdata[cidx+2]&0xff) << 8) | (rdata[cidx+3] & 0xff); cidx += 4; refresh = ((rdata[cidx]&0xff) << 24) | ((rdata[cidx+1] & 0xff)<<16) | ((rdata[cidx+2]&0xff) << 8) | (rdata[cidx+3] & 0xff); cidx += 4; retry = ((rdata[cidx]&0xff) << 24) | ((rdata[cidx+1] & 0xff)<<16) | ((rdata[cidx+2]&0xff) << 8) | (rdata[cidx+3] & 0xff); cidx += 4; expire = ((rdata[cidx]&0xff) << 24) | ((rdata[cidx+1] & 0xff)<<16) | ((rdata[cidx+2]&0xff) << 8) | (rdata[cidx+3] & 0xff); cidx += 4; minimum = ((rdata[cidx]&0xff) << 24) | ((rdata[cidx+1] & 0xff)<<16) | ((rdata[cidx+2]&0xff) << 8) | (rdata[cidx+3] & 0xff); cidx += 4; dlen = cidx - fidx; // System.out.println("cidx: "+cidx); } } --- NEW FILE: MRRR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MR</code> RR record structure mapping. <pre> MR RDATA format (EXPERIMENTAL). +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / NEWNAME / / / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: NEWNAME A <domain-name> which specifies a mailbox which is the proper rename of the specified mailbox. MR records cause no additional section processing. The main use for MR is as a forwarding entry for a user who has moved to a different mailbox. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MRRR extends CNAMERR { String emailbx; public String toString() { return aname+"\t\t"+super.toString()+"\t"+cname; } public MRRR( byte[]pkt, int cidx ) { super(pkt, cidx); } public void processRecord( int cidx ) { int fcidx = cidx; super.processRecord( cidx ); cidx += dlen; StringBuffer buf = new StringBuffer(); cidx = RR.extractName( rdata, cidx, rdata.length, buf ); emailbx = buf.toString(); dlen = (cidx - fcidx); } } --- NEW FILE: MINFORR.java --- package edu.harvard.syrah.sbon.async.comm.dns; /** This class provides the <code>MINFO</code> RR record structure mapping. <pre> MINFO RDATA format (EXPERIMENTAL). +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / RMAILBX / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ / EMAILBX / +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ where: RMAILBX A <domain-name> which specifies a mailbox which is responsible for the mailing list or mailbox. If this domain name names the root, the owner of the MINFO RR is responsible for itself. Note that many existing mailing lists use a mailbox X-request for the RMAILBX field of mailing list X, e.g., Msgroup-request for Msgroup. This field provides a more general mechanism. EMAILBX A <domain-name> which specifies a mailbox which is to receive error messages related to the mailing list or mailbox specified by the owner of the MINFO RR (similar to the ERRORS-TO: field which has been proposed). If this domain name names the root, errors should be returned to the sender of the message. MINFO records cause no additional section processing. Although these records can be associated with a simple mailbox, they are usually used with a mailing list. </pre> <hr> Copyright (c) 2001-2004, Gregg Wonderly All rights reserved. <p> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: <p> Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. <p> Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. <p> Neither the name of Gregg Wonderly nor the names of contributors to this software may be used to endorse or promote products derived from this software without specific prior written permission. <p> THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @author Gregg Wonderly - gre...@po... */ public class MINFORR extends CNAMERR { String emailbx; public String toString() { return super.toString()+", "+emailbx; } public MINFORR( byte[]pkt, int cidx ) { super(pkt, cidx); } public String getRMailBX() { return getCName(); } public String getEMailBX() { return emailbx; } public void processRecord( int cidx ) { int fcidx = cidx; super.processRecord( cidx ); cidx += dlen; StringBuffer buf = new StringBuffer(); cidx = RR.extractName( rdata, cidx, rdata.length, buf ); emailbx = buf.toString(); dlen = (cidx - fcidx); } } --- NEW FILE: DNSComm.java --- /* * @author Last modified by $Author: prp $ * @version $Revision: 1.1 $ on $Date: 2007/06/04 17:55:40 $ * @since Aug 10, 2005 */ package edu.harvard.syrah.sbon.async.comm.dns; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import sun.net.dns.ResolverConfiguration; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.NetUtil; import edu.harvard.syrah.prp.PUtil; import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EL; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.async.EL.Priority; import edu.harvard.syrah.sbon.async.comm.*; /** * Communication module for DNS * */ public class DNSComm extends UDPComm implements DNSCommIF { protected static final Log log = new Log(DNSComm.class); private static final long START_DNS_TIMEOUT = 1000; // 1000 ms private Map<String, Integer> dnsCache = new HashMap<String, Integer>(); private Map<Integer, String> reverseDNSCache = new HashMap<Integer, String>(); int MAX_DNS_RETRY_COUNT = 10; private Set<AddressIF> dnsServerList; public DNSComm() { super(Priority.NORMAL); log.debug("Initialising DNSComm..."); } @SuppressWarnings("unchecked") public void initServer(CB0 cbInit) { List<String> nsList = ResolverConfiguration.open().nameservers(); // log.debug("dnsServers=" + Util.toString(nsList)); dnsServerList = new HashSet<AddressIF>(); int i; for (i = 0; i < nsList.size(); i++) { dnsServerList.add(AddressFactory.createUnresolved(nsList.get(i), 53)); } log.debug("Initialising DNSComm with " + i + " DNS servers: " + dnsServerList); // dnsServer = AddressFactory.createUnresolved("192.168.5.10", 53); // Bind this to the wildcard address - does this mean that we'll see all UDP // packets? // AddressIF localBindAddress = AddressFactory.createServer(0); // super.initServer(localBindAddress, cbInit); } public void getHostByName(final String hostname, final CB1<InetAddress> cbInetAddress) { getIPAddrByName(hostname, new CB1<byte[]>() { protected void cb(CBResult result, byte[] byteIPAddr) { switch (result.state) { case OK: { InetAddress ipAddr = null; try { ipAddr = InetAddress.getByAddress(hostname, byteIPAddr); } catch (UnknownHostException e) { log.error("Unknown host after IP address resolution? " + e); } cbInetAddress.call(result, ipAddr); break; } case TIMEOUT: { cbInetAddress.call(CBResult.TIMEOUT("DNS timeout for host=" + hostname), null); break; } case ERROR: { cbInetAddress.call(result, null); break; } } } }); } public void getIPAddrByName(final String hostname, final CB1<byte[]> cbByteIPAddr) { // log.debug("getIPAddrByName.hostname=" + hostname); // Check the cache final byte[] cachedByteIPAddr = lookupCacheHostname(hostname); if (cachedByteIPAddr != null) { log.debug("DNS cache hit: cachedByteIPAddr=" + NetUtil.byteIPAddrToString(cachedByteIPAddr)); EL.get().registerTimerCB(new CB0() { protected void cb(CBResult result) { cbByteIPAddr.call(CBResult.OK(), cachedByteIPAddr); } }); return; } doQueryByName(hostname, cbByteIPAddr, 0); } public AddressIF getDNSServer() { return PUtil.getRandomObject(dnsServerList); } public void doQueryByName(final String hostname, final CB1<byte[]> cbByteIPAddr, final int queryCount) { DNSQuestion aQuestion = new DNSQuestion(hostname, DNSQuestion.TYPE_A, DNSQuestion.CLASS_IN); DNSQuery q = new DNSQuery(new DNSQuestion[] { aQuestion }); final AddressIF dnsServer = getDNSServer(); log.debug("Sending DNS query to " + dnsServer); long dnsTimeout = START_DNS_TIMEOUT * (queryCount + 1); q.runQuery(this, dnsServer, new CB1<DNSQuery>(dnsTimeout) { protected void cb(CBResult result, DNSQuery query) { switch (result.state) { case OK: { byte[] byteIPAddr = null; for (RR rr : query.getAnswers()) { if (rr instanceof ARR) { ARR arr = (ARR) rr; byteIPAddr = arr.byteAddr; } } if (byteIPAddr != null) { updateCache(hostname, byteIPAddr); cbByteIPAddr.call(CBResult.OK(), byteIPAddr); } else { log.debug("Unknown host=" + hostname); cbByteIPAddr.call(CBResult.ERROR("Unknown host: " + hostna... [truncated message content] |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:25
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon/async Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:23
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:23
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/scriptroute In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon/async/comm/scriptroute Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/scriptroute added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:23
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/xmlrpc In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon/async/comm/xmlrpc Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/xmlrpc added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:20
|
Update of /cvsroot/pyxida/AsyncJ/src/edu In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:20
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:20
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon/async/comm Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:19
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:19
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/http In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon/async/comm/http Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/http added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:19
|
Update of /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/obj In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src/edu/harvard/syrah/sbon/async/comm/obj Log Message: Directory /cvsroot/pyxida/AsyncJ/src/edu/harvard/syrah/sbon/async/comm/obj added to the repository |
|
From: Peter P. <pr...@us...> - 2007-06-04 17:55:19
|
Update of /cvsroot/pyxida/AsyncJ/src In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21417/src Log Message: Directory /cvsroot/pyxida/AsyncJ/src added to the repository |