|
From: Peter P. <pr...@us...> - 2006-11-27 11:10:51
|
Update of /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv3812/src/edu/harvard/syrah/pyxida/nc Modified Files: NCManager.java GossipResponseMsg.java GossipRequestMsg.java Log Message: Addressed some (all?) of Jon's TODO. Jon, grep for "TODO JTL". Index: NCManager.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc/NCManager.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** NCManager.java 27 Nov 2006 04:06:37 -0000 1.4 --- NCManager.java 27 Nov 2006 11:10:48 -0000 1.5 *************** *** 1,288 **** package edu.harvard.syrah.pyxida.nc; import java.util.HashSet; import java.util.Set; - import edu.harvard.syrah.pyxida.nc.lib.*; import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.pyxida.nc.lib.NCClient; import edu.harvard.syrah.pyxida.ping.PingManager; import edu.harvard.syrah.sbon.comm.AddressIF; ! import edu.harvard.syrah.sbon.comm.obj.*; ! import edu.harvard.syrah.sbon.async.*; ! import edu.harvard.syrah.sbon.async.CallbacksIF.*; // TODO add proxy coordinates public class NCManager { ! static Log log = new Log(NCManager.class); ! ! // TODO read from config file ! private static final int NC_NUM_DIMS = 5; ! // Height is set within NCClient and is the "last dimension" ! // of the coordinates array. ! // Thus, setting this to 5 is 4d+height if height is in use. ! // This allows the size on the wire to remain constant ! // which is important for Azureus integration. ! ! // TODO PRP read from config file ! //private String bootstrapList[] = ("glenora", "sb01", "sb02", "sb10", "sb11"); ! // TODO PRP read from config file ! public static final long UPDATE_DELAY = 10*1000; // 10 seconds ! public static final long DEFAULT_PROXY_LEASE = 3600 * 1000; ! private PingManager pingManager = null; ! ! private Set<ProxyClient> proxyClients = new HashSet<ProxyClient>(); ! final ObjCommIF comm; ! final CoordClient localNC; ! ! public NCManager(ObjCommIF _comm, PingManager pingManager) { ! comm = _comm; ! this.pingManager = pingManager; ! // Initialise the local coord first ! localNC = new CoordClient (); ! } ! public void init() { ! comm.registerMessageCB(GossipRequestMsg.class, new GossipHandler()); ! // TODO PRP ! /* ! foreach (String remoteNode : bootstrapList) { ! // resolve and add him ! } ! */ ! localNC.init(); ! ! // Also ping for any proxy coordinates. ! } ! public void createProxyCoord(AddressIF remoteNode, long lease) { ! // TODO ! } ! public void destroyProxyCoord(AddressIF remoteNode) { ! // TODO ! } ! public Coordinate getLocalCoord() { ! return localNC.nc.getSystemCoords(); ! } ! public double getLocalError() { ! return localNC.nc.getSystemError(); ! } ! public Coordinate getProxyCoord(AddressIF remoteNode) { ! // TODO ! return null; ! } ! public double estimateRTT(AddressIF nodeA, AddressIF nodeB) { ! // TODO ! return 0; ! } ! /* ! // TODO PRP low priority ! public void startUp(DataInputStream is) throws IOException { ! // TODO ! } ! public void shutDown(DataOutputStream os) throws IOException { ! // TODO ! } ! */ ! ! abstract class ResponseObjCommCB<T extends ObjMessageIF> extends ObjCommCB<T> { ! void sendResponseMessage (final String handler, final AddressIF remoteAddr, final ObjMessage response, long requestMsgId, final String errorMessage, final CB1<Boolean> cbHandled) { ! CBResult result = null; ! if (errorMessage != null) { ! log.debug(handler+ " :"+errorMessage); ! result = CBResult.ERROR(errorMessage); ! } else { ! result = CBResult.OK(); ! } ! ! comm.sendResponseMessage(response, remoteAddr, requestMsgId, new CB0() { ! protected void cb(CBResult sendResult) { ! switch (sendResult.state) { ! case TIMEOUT: ! case ERROR: { ! log.warn(handler+": "+sendResult.what); ! return; ! } ! } ! }}); ! cbHandled.call(CBResult.OK(), true); ! } ! } ! class GossipHandler extends ResponseObjCommCB<GossipRequestMsg> { ! public void cb(CBResult result, GossipRequestMsg msg, ! AddressIF remoteAddr, Long ts, ! final CB1<Boolean> cbHandled) { ! log.debug ("in GossipHandler cb: "+msg); ! long curr_time = System.currentTimeMillis(); ! sendResponseMessage ! ("Gossip", remoteAddr, ! new GossipResponseMsg(localNC.nc.getSystemCoords(), localNC.nc.getSystemError(), localNC.nc.getAge(curr_time), getGossipNodes(msg.coordinate)), ! msg.getMsgId(), null, cbHandled); ! ! } ! } ! Set<AddressIF> getGossipNodes (Coordinate remoteCoord) { ! // TODO return coordinates, possibly based on this guy's coordinate ! return null; ! } ! void addGossipNodes (Set<AddressIF> nodes) { ! // TODO add nodes to the set that we know about ! } ! AddressIF getGossipNode () { ! // TODO return a guy to gossip with ! return null; ! } ! class CoordClient { ! final NCClient<AddressIF> nc; ! public CoordClient () { ! nc = new NCClient<AddressIF>(NCManager.NC_NUM_DIMS); } ! public void init() { ! // TODO continuous timer ! // really would prefer randomly distributed timings ! // or an adaptive delay :) ! EventLoop.get().registerTimerCB(UPDATE_DELAY, new CB0() { ! protected void cb(CBResult result) { ! update(); ! // TODO PRP result.call(CBResult.OK()); ?? ! } ! }); } ! void update () { ! final AddressIF neighbor = pickGossipNode (); ! // send him a gossip msg ! // TODO could bias which nodes are sent based on his coord ! GossipRequestMsg msg = ! new GossipRequestMsg(localNC.nc.getSystemCoords(),getGossipNodes(null)); ! comm.sendRequestMessage(msg, neighbor, new ObjCommCB<GossipResponseMsg>() { ! protected void cb(CBResult result, final GossipResponseMsg responseMsg, ! AddressIF remoteAddr, Long ts, CB1<Boolean> cbHandled) { ! // TODO can use time of this instead of ping time ! // if we want to not use jpcap pings ! // (running not at root) ! switch (result.state) { ! case OK: { ! // keep track of new guys he's told us about ! addGossipNodes(responseMsg.nodes); ! // and ping him ! pingManager.addPingRequest(comm.getLocalAddress(), neighbor, new CB1<Float>() { ! protected void cb(CBResult pingResult, Float latency) { ! switch (pingResult.state) { ! case OK:{ ! // and update our coordinate ! // TODO convert nclib to use floats ! long lat_ms = (long) Math.round(latency); ! long curr_time = System.currentTimeMillis(); ! localNC.nc.processSample(neighbor, responseMsg.remoteCoordinate, responseMsg.remoteError, lat_ms, responseMsg.remoteAge, curr_time, true); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.warn("Ping to "+neighbor+" failed"); ! break; ! } } - } }); - - break; - } - case ERROR: - case TIMEOUT: { - log.warn("Did not receive gossip response from "+neighbor); - break; - } - } } - }); - } ! AddressIF pickGossipNode () { ! // TODO ask our ncClient if it has a preferred gossip node ! // if not, use somebody from our neighbor set ! return getGossipNode(); ! } ! } ! class ProxyClient extends CoordClient { ! final AddressIF addr; ! long lease; ! void update () { ! AddressIF neighbor = pickGossipNode (); ! // ask him to ping our address ! // and tell us about him ! // update our coordinate ! } ! public ProxyClient (AddressIF _addr, long _lease) { ! super (); ! renewLease (_lease); ! // TODO PRP does this need to be cloned? ! //addr = _addr.clone(); ! addr = _addr; ! } ! public void renewLease (long _lease) { ! if (_lease <= 0) { ! log.warn ("Changing given lease "+_lease+" to default lease "+ ! DEFAULT_PROXY_LEASE); ! _lease = DEFAULT_PROXY_LEASE; ! } ! lease = System.currentTimeMillis() + _lease; ! if (lease < 0) lease = Long.MAX_VALUE; ! } ! public boolean hasExpired () { ! if (System.currentTimeMillis() > lease) { ! return true; ! } ! return false; } - } } --- 1,322 ---- package edu.harvard.syrah.pyxida.nc; + import java.util.Arrays; import java.util.HashSet; + import java.util.Map; import java.util.Set; import edu.harvard.syrah.prp.Log; + import edu.harvard.syrah.pyxida.nc.lib.Coordinate; import edu.harvard.syrah.pyxida.nc.lib.NCClient; import edu.harvard.syrah.pyxida.ping.PingManager; + import edu.harvard.syrah.sbon.async.CBResult; + import edu.harvard.syrah.sbon.async.Config; + import edu.harvard.syrah.sbon.async.EventLoop; + import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; + import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; + import edu.harvard.syrah.sbon.comm.AddressFactory; import edu.harvard.syrah.sbon.comm.AddressIF; ! import edu.harvard.syrah.sbon.comm.obj.ObjCommCB; ! import edu.harvard.syrah.sbon.comm.obj.ObjCommIF; ! import edu.harvard.syrah.sbon.comm.obj.ObjMessage; ! import edu.harvard.syrah.sbon.comm.obj.ObjMessageIF; // TODO add proxy coordinates public class NCManager { ! private static final Log log = new Log(NCManager.class); ! // Number of dimensions ! private static final int NC_NUM_DIMS = Integer.parseInt(Config.getProperty("pyxida.dimensions", "5")); ! ! // Height is set within NCClient and is the "last dimension" ! // of the coordinates array. ! // Thus, setting this to 5 is 4d+height if height is in use. ! // This allows the size on the wire to remain constant ! // which is important for Azureus integration. ! // TODO JTL: You need to test the regex below. It should work ! private String bootstrapList[] = Config.getProperty("pyxida.bootstraplist", "glenora, sb01, sb02, sb10, sb11").split("[,\\s]"); ! ! public static final long UPDATE_DELAY = 10 * 1000; // 10 seconds ! public static final long DEFAULT_PROXY_LEASE = 3600 * 1000; ! private PingManager pingManager = null; ! private Set<ProxyClient> proxyClients = new HashSet<ProxyClient>(); ! final ObjCommIF comm; ! final CoordClient localNC; ! public NCManager(ObjCommIF _comm, PingManager pingManager) { ! comm = _comm; ! this.pingManager = pingManager; ! // Initialise the local coord first ! localNC = new CoordClient(); ! } ! public void init(final CB0 cbDone) { ! comm.registerMessageCB(GossipRequestMsg.class, new GossipHandler()); ! AddressFactory.createResolved(Arrays.asList(bootstrapList), new CB1<Map<String, AddressIF>>() { ! protected void cb(CBResult result, Map<String, AddressIF> addrMap) { ! switch (result.state) { ! case OK: { ! for (String remoteNode : addrMap.keySet()) { ! AddressIF remoteAddr = addrMap.get(remoteNode); ! // TODO JTL: add remoteAddr to your data structure ! } ! localNC.init(); ! // Also ping for any proxy coordinates. ! cbDone.callOK(); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.error("Could not resolve bootstrap list: " + result.what); ! break; ! } ! } ! } ! }); ! } ! public void createProxyCoord(AddressIF remoteNode, long lease) { ! // TODO ! } ! public void destroyProxyCoord(AddressIF remoteNode) { ! // TODO ! } ! public Coordinate getLocalCoord() { ! return localNC.nc.getSystemCoords(); ! } ! public double getLocalError() { ! return localNC.nc.getSystemError(); ! } ! public Coordinate getProxyCoord(AddressIF remoteNode) { ! // TODO ! return null; ! } ! public double estimateRTT(AddressIF nodeA, AddressIF nodeB) { ! // TODO ! return 0; ! } ! /* ! // TODO PRP low priority ! public void startUp(DataInputStream is) throws IOException { ! // TODO ! } ! public void shutDown(DataOutputStream os) throws IOException { ! // TODO ! } ! */ ! abstract class ResponseObjCommCB<T extends ObjMessageIF> extends ObjCommCB<T> { ! void sendResponseMessage(final String handler, final AddressIF remoteAddr, ! final ObjMessage response, long requestMsgId, final String errorMessage, ! final CB1<Boolean> cbHandled) { ! CBResult result = null; ! if (errorMessage != null) { ! log.debug(handler + " :" + errorMessage); ! result = CBResult.ERROR(errorMessage); ! } else { ! result = CBResult.OK(); ! } ! comm.sendResponseMessage(response, remoteAddr, requestMsgId, new CB0() { ! protected void cb(CBResult sendResult) { ! switch (sendResult.state) { ! case TIMEOUT: ! case ERROR: { ! log.warn(handler + ": " + sendResult.what); ! return; ! } ! } ! } ! }); ! cbHandled.call(CBResult.OK(), true); ! } ! } + class GossipHandler extends ResponseObjCommCB<GossipRequestMsg> { ! public void cb(CBResult result, GossipRequestMsg msg, AddressIF remoteAddr, Long ts, ! final CB1<Boolean> cbHandled) { ! log.debug("in GossipHandler cb: " + msg); ! long curr_time = System.currentTimeMillis(); ! sendResponseMessage("Gossip", remoteAddr, ! new GossipResponseMsg(localNC.nc.getSystemCoords(), localNC.nc.getSystemError(), ! localNC.nc.getAge(curr_time), getGossipNodes(msg.coordinate)), msg.getMsgId(), null, ! cbHandled); ! ! } } ! Set<AddressIF> getGossipNodes(Coordinate remoteCoord) { ! // TODO return coordinates, possibly based on this guy's coordinate ! return null; } ! void addGossipNodes(Set<AddressIF> nodes) { ! // TODO add nodes to the set that we know about ! } ! AddressIF getGossipNode() { ! // TODO return a guy to gossip with ! return null; ! } ! class CoordClient { ! final NCClient<AddressIF> nc; ! public CoordClient() { ! nc = new NCClient<AddressIF>(NCManager.NC_NUM_DIMS); ! } ! public void init() { ! // TODO continuous timer ! // really would prefer randomly distributed timings ! // or an adaptive delay :) ! // ! // TODO JTL ! // ! // You can easily do this by varying UPDATE_DELAY. Remember that the semantics ! // of the call is that you have to re-register after each callback. ! EventLoop.get().registerTimerCB(UPDATE_DELAY, new CB0() { ! protected void cb(CBResult result) { ! update(); ! } ! }); ! } ! void update() { ! final AddressIF neighbor = pickGossipNode(); ! // send him a gossip msg ! // TODO could bias which nodes are sent based on his coord ! GossipRequestMsg msg = new GossipRequestMsg(localNC.nc.getSystemCoords(), ! getGossipNodes(null)); ! comm.sendRequestMessage(msg, neighbor, new ObjCommCB<GossipResponseMsg>() { ! protected void cb(CBResult result, final GossipResponseMsg responseMsg, ! AddressIF remoteAddr, Long ts, CB1<Boolean> cbHandled) { ! // TODO can use time of this instead of ping time ! // if we want to not use jpcap pings ! // (running not at root) ! // ! // TODO JTL: good idea. however, i'm not sure what the best way is to integrate this... ! // Ideally, all latency measurements should be done by the ping manager...? ! ! switch (result.state) { ! case OK: { ! // keep track of new guys he's told us about ! addGossipNodes(responseMsg.nodes); ! ! // and ping him ! pingManager.addPingRequest(comm.getLocalAddress(), neighbor, new CB1<Float>() { ! protected void cb(CBResult pingResult, Float latency) { ! switch (pingResult.state) { ! case OK: { ! // and update our coordinate ! ! // TODO convert nclib to use floats ! // ! // TODO JTL: why do you want to use floats and not double? won't we run into ! // accuracy problems? ! long lat_ms = (long) Math.round(latency); ! long curr_time = System.currentTimeMillis(); ! ! localNC.nc.processSample(neighbor, responseMsg.remoteCoordinate, ! responseMsg.remoteError, lat_ms, responseMsg.remoteAge, curr_time, true); ! ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.warn("Ping to " + neighbor + " failed"); ! break; ! } ! } ! } ! }); ! ! break; ! } ! case ERROR: ! case TIMEOUT: { ! log.warn("Did not receive gossip response from " + neighbor); ! break; ! } ! } } }); } ! AddressIF pickGossipNode() { ! // TODO ask our ncClient if it has a preferred gossip node ! // if not, use somebody from our neighbor set ! return getGossipNode(); ! } ! } ! class ProxyClient extends CoordClient { ! final AddressIF addr; ! long lease; ! void update() { ! AddressIF neighbor = pickGossipNode(); ! // ask him to ping our address ! // and tell us about him ! // update our coordinate ! } ! public ProxyClient(AddressIF _addr, long _lease) { ! super(); ! renewLease(_lease); ! // TODO Does this need to be cloned? ! // prp: Don't know but if you want to clone it, use the copy constructor ! addr = AddressFactory.create(_addr); ! } ! public void renewLease(long _lease) { ! if (_lease <= 0) { ! log.warn("Changing given lease " + _lease + " to default lease " + DEFAULT_PROXY_LEASE); ! _lease = DEFAULT_PROXY_LEASE; ! } ! lease = System.currentTimeMillis() + _lease; ! if (lease < 0) ! lease = Long.MAX_VALUE; ! } ! public boolean hasExpired() { ! if (System.currentTimeMillis() > lease) { ! return true; ! } ! return false; ! } } } Index: GossipResponseMsg.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc/GossipResponseMsg.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** GossipResponseMsg.java 27 Nov 2006 04:06:37 -0000 1.2 --- GossipResponseMsg.java 27 Nov 2006 11:10:48 -0000 1.3 *************** *** 2,9 **** import java.util.Set; ! import edu.harvard.syrah.sbon.comm.obj.ObjMessage; ! import edu.harvard.syrah.pyxida.nc.lib.*; ! import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.comm.AddressIF; /** --- 2,9 ---- import java.util.Set; ! ! import edu.harvard.syrah.pyxida.nc.lib.Coordinate; import edu.harvard.syrah.sbon.comm.AddressIF; + import edu.harvard.syrah.sbon.comm.obj.ObjMessage; /** Index: GossipRequestMsg.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc/GossipRequestMsg.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** GossipRequestMsg.java 25 Nov 2006 21:39:59 -0000 1.1 --- GossipRequestMsg.java 27 Nov 2006 11:10:48 -0000 1.2 *************** *** 2,9 **** import java.util.Set; ! import edu.harvard.syrah.sbon.comm.obj.ObjMessage; ! import edu.harvard.syrah.pyxida.nc.lib.*; ! import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.sbon.comm.AddressIF; /** --- 2,9 ---- import java.util.Set; ! ! import edu.harvard.syrah.pyxida.nc.lib.Coordinate; import edu.harvard.syrah.sbon.comm.AddressIF; + import edu.harvard.syrah.sbon.comm.obj.ObjMessage; /** |