|
From: Peter P. <pr...@us...> - 2006-11-27 20:31:48
|
Update of /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv16925/src/edu/harvard/syrah/pyxida/nc Modified Files: NCManager.java Log Message: Tried to fix the null cb bug with request/response msgs. Index: NCManager.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/nc/NCManager.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** NCManager.java 27 Nov 2006 19:03:57 -0000 1.9 --- NCManager.java 27 Nov 2006 20:31:40 -0000 1.10 *************** *** 16,23 **** import edu.harvard.syrah.sbon.comm.AddressFactory; import edu.harvard.syrah.sbon.comm.AddressIF; ! import edu.harvard.syrah.sbon.comm.obj.ObjCommCB; ! import edu.harvard.syrah.sbon.comm.obj.ObjCommIF; ! import edu.harvard.syrah.sbon.comm.obj.ObjMessage; ! import edu.harvard.syrah.sbon.comm.obj.ObjMessageIF; //MEDTODO add proxy coordinates --- 16,20 ---- import edu.harvard.syrah.sbon.comm.AddressFactory; import edu.harvard.syrah.sbon.comm.AddressIF; ! import edu.harvard.syrah.sbon.comm.obj.*; //MEDTODO add proxy coordinates *************** *** 30,168 **** */ public class NCManager { ! private static final Log log = new Log(NCManager.class); ! // Number of dimensions ! private static final int NC_NUM_DIMS = Integer.parseInt(Config.getProperty("pyxida.dimensions", "5")); ! // Height is set within NCClient and is the "last dimension" ! // of the coordinates array. ! // Thus, setting this to 5 is 4d+height if height is in use. ! // This allows the size on the wire to remain constant ! // which is important for Azureus integration. ! // JTLTODO: You need to test the regex below. It should work ! private String bootstrapList[] = Config.getProperty("pyxida.bootstraplist", "glenora.eecs.harvard.edu sb01.eecs.harvard.edu sb02.eecs.harvard.edu sb10.eecs.harvard.edu sb11.eecs.harvard.edu").split("[\\s]"); ! /** ! * Time between gossip messages to coordinate neighbors. ! * Default is 10 seconds. ! */ ! public static final long UPDATE_DELAY = 10 * 1000; ! /** ! * Default lifetime that proxy coordinates are managed for. ! * Set to one hour. ! */ ! public static final long DEFAULT_PROXY_LEASE = 3600 * 1000; ! private PingManager pingManager = null; ! private Set<ProxyClient> proxyClients = new HashSet<ProxyClient>(); ! final ObjCommIF comm; ! final CoordClient localNC; ! Set<AddressIF> upNeighbors; ! Set<AddressIF> downNeighbors; ! Set<AddressIF> pendingNeighbors; ! /** ! * Create a coordinate manager. ! * Does not block. ! */ ! public NCManager(ObjCommIF _comm, PingManager pingManager) { ! comm = _comm; ! this.pingManager = pingManager; ! // Initialise the local coord first ! localNC = new CoordClient(); ! } ! /** ! * Asynchronous initialization of coordinate manager. ! * Resolves bootstrap neighbors and starts gossip for local coordinate. ! * Starts listening for gossip messages. ! */ ! public void init(final CB0 cbDone) { ! comm.registerMessageCB(GossipRequestMsg.class, new GossipHandler()); ! upNeighbors = new HashSet<AddressIF>(); ! downNeighbors = new HashSet<AddressIF>(); ! pendingNeighbors = new HashSet<AddressIF>(); ! log.debug("Resolving bootstrap list"); ! AddressFactory.createResolved(Arrays.asList(bootstrapList), Pyxida.COMM_PORT, new CB1<Map<String, AddressIF>>() { ! protected void cb(CBResult result, Map<String, AddressIF> addrMap) { ! switch (result.state) { ! case OK: { ! for (String remoteNode : addrMap.keySet()) { ! log.debug("remoteNode='" + remoteNode + "'"); ! AddressIF remoteAddr = addrMap.get(remoteNode); ! addPendingNeighbor(remoteAddr); ! } ! // Starts local coordinate timer ! localNC.init(); ! // MEDTODO Start periodic cleaner ! //neighborClean(); ! cbDone.callOK(); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.error("Could not resolve bootstrap list: " + result.what); ! break; ! } ! } ! } ! }); ! } ! /** ! * Not implemented yet. ! */ ! public void createProxyCoord(AddressIF remoteNode, long lease) { ! // LOWTODO ! } ! /** ! * Not implemented yet. ! */ ! public void destroyProxyCoord(AddressIF remoteNode) { ! // LOWTODO ! } ! /** ! * @return local coordinate ! */ ! public Coordinate getLocalCoord() { ! return localNC.nc.getSystemCoords(); ! } ! /** ! * @return local error ! */ ! public double getLocalError() { ! return localNC.nc.getSystemError(); ! } ! /** ! * Not implemented yet. ! */ ! public Coordinate getProxyCoord(AddressIF remoteNode) { ! // LOWTODO ! return null; ! } ! /** ! * ! */ ! public double estimateRTT(AddressIF nodeA, AddressIF nodeB) { ! // TODO ! return 0; ! } ! /* // LOWTODO PRP public void startUp(DataInputStream is) throws IOException { --- 27,165 ---- */ public class NCManager { ! private static final Log log = new Log(NCManager.class); ! // Number of dimensions ! private static final int NC_NUM_DIMS = Integer.parseInt(Config.getProperty("pyxida.dimensions", "5")); ! // Height is set within NCClient and is the "last dimension" ! // of the coordinates array. ! // Thus, setting this to 5 is 4d+height if height is in use. ! // This allows the size on the wire to remain constant ! // which is important for Azureus integration. ! // JTLTODO: You need to test the regex below. It should work ! private String bootstrapList[] = Config.getProperty("pyxida.bootstraplist", "glenora.eecs.harvard.edu sb01.eecs.harvard.edu sb02.eecs.harvard.edu sb10.eecs.harvard.edu sb11.eecs.harvard.edu").split("[\\s]"); ! /** ! * Time between gossip messages to coordinate neighbors. ! * Default is 10 seconds. ! */ ! public static final long UPDATE_DELAY = 10 * 1000; ! /** ! * Default lifetime that proxy coordinates are managed for. ! * Set to one hour. ! */ ! public static final long DEFAULT_PROXY_LEASE = 3600 * 1000; ! private PingManager pingManager = null; ! private Set<ProxyClient> proxyClients = new HashSet<ProxyClient>(); ! final ObjCommIF comm; ! final CoordClient localNC; ! Set<AddressIF> upNeighbors; ! Set<AddressIF> downNeighbors; ! Set<AddressIF> pendingNeighbors; ! /** ! * Create a coordinate manager. ! * Does not block. ! */ ! public NCManager(ObjCommIF _comm, PingManager pingManager) { ! comm = _comm; ! this.pingManager = pingManager; ! // Initialise the local coord first ! localNC = new CoordClient(); ! } ! /** ! * Asynchronous initialization of coordinate manager. ! * Resolves bootstrap neighbors and starts gossip for local coordinate. ! * Starts listening for gossip messages. ! */ ! public void init(final CB0 cbDone) { ! comm.registerMessageCB(GossipRequestMsg.class, new GossipHandler()); ! upNeighbors = new HashSet<AddressIF>(); ! downNeighbors = new HashSet<AddressIF>(); ! pendingNeighbors = new HashSet<AddressIF>(); ! log.debug("Resolving bootstrap list"); ! AddressFactory.createResolved(Arrays.asList(bootstrapList), Pyxida.COMM_PORT, new CB1<Map<String, AddressIF>>() { ! protected void cb(CBResult result, Map<String, AddressIF> addrMap) { ! switch (result.state) { ! case OK: { ! for (String remoteNode : addrMap.keySet()) { ! log.debug("remoteNode='" + remoteNode + "'"); ! AddressIF remoteAddr = addrMap.get(remoteNode); ! addPendingNeighbor(remoteAddr); ! } ! // Starts local coordinate timer ! localNC.init(); ! // MEDTODO Start periodic cleaner ! //neighborClean(); ! cbDone.callOK(); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.error("Could not resolve bootstrap list: " + result.what); ! break; ! } ! } ! } ! }); ! } ! /** ! * Not implemented yet. ! */ ! public void createProxyCoord(AddressIF remoteNode, long lease) { ! // LOWTODO ! } ! /** ! * Not implemented yet. ! */ ! public void destroyProxyCoord(AddressIF remoteNode) { ! // LOWTODO ! } ! /** ! * @return local coordinate ! */ ! public Coordinate getLocalCoord() { ! return localNC.nc.getSystemCoords(); ! } ! /** ! * @return local error ! */ ! public double getLocalError() { ! return localNC.nc.getSystemError(); ! } ! /** ! * Not implemented yet. ! */ ! public Coordinate getProxyCoord(AddressIF remoteNode) { ! // LOWTODO ! return null; ! } ! /** ! * ! */ ! public double estimateRTT(AddressIF nodeA, AddressIF nodeB) { ! // TODO ! return 0; ! } ! /* // LOWTODO PRP public void startUp(DataInputStream is) throws IOException { *************** *** 171,438 **** public void shutDown(DataOutputStream os) throws IOException { } ! */ ! abstract class ResponseObjCommCB<T extends ObjMessageIF> extends ObjCommCB<T> { ! void sendResponseMessage(final String handler, final AddressIF remoteAddr, ! final ObjMessage response, long requestMsgId, final String errorMessage, ! final CB1<Boolean> cbHandled) { ! CBResult result = null; ! if (errorMessage != null) { ! log.debug(handler + " :" + errorMessage); ! result = CBResult.ERROR(errorMessage); ! } else { ! result = CBResult.OK(); } ! ! comm.sendResponseMessage(response, remoteAddr, requestMsgId, new CB0() { ! protected void cb(CBResult sendResult) { ! switch (sendResult.state) { ! case TIMEOUT: ! case ERROR: { ! log.warn(handler + ": " + sendResult.what); ! return; ! } ! } ! } ! }); ! //cbHandled.call(CBResult.OK(), true); ! cbHandled.call(result, true); } } ! class GossipHandler extends ResponseObjCommCB<GossipRequestMsg> { ! public void cb(CBResult result, GossipRequestMsg msg, AddressIF remoteAddr, Long ts, ! final CB1<Boolean> cbHandled) { ! log.debug("in GossipHandler cb: " + msg); ! // we just heard from him so we know he is up ! addUpNeighbor(remoteAddr); ! long curr_time = System.currentTimeMillis(); ! sendResponseMessage("Gossip", remoteAddr, ! new GossipResponseMsg(localNC.nc.getSystemCoords(), localNC.nc.getSystemError(), ! localNC.nc.getAge(curr_time), getUpNeighbors(msg.coordinate)), msg.getMsgId(), null, ! cbHandled); - } } ! AddressIF getUpNeighbor() { ! if (upNeighbors.size() == 0 && pendingNeighbors.size() == 0) { ! log.warn("we are lonely and have no one to gossip with"); ! return null; ! } ! final double pctUsePendingNeighbor = 0.1; ! AddressIF upNeighbor; ! if (upNeighbors.size() == 0 || ! Pyxida.random.nextDouble() < pctUsePendingNeighbor) { ! upNeighbor = PUtil.getRandomObject(pendingNeighbors); ! } else { ! upNeighbor = PUtil.getRandomObject(upNeighbors); ! } ! ! return upNeighbor; } ! Set<AddressIF> getUpNeighbors(Coordinate remoteCoord) { ! Set<AddressIF> nodes = new HashSet<AddressIF>(); ! // LOWTODO add option of loop here ! AddressIF node = getUpNeighbor(); ! if (node != null) { ! nodes.add(node); ! } ! return nodes; ! } ! void addPendingNeighbors(Set<AddressIF> nodes) { ! for (AddressIF node : nodes) { ! addPendingNeighbor(node); ! } } ! // If this guy is in an unknown state ! // add him to pending. ! void addPendingNeighbor(AddressIF node) { ! if (node.equals(comm.getLocalAddress())) return; ! if (!pendingNeighbors.contains(node) && ! !upNeighbors.contains(node) && ! !downNeighbors.contains(node)) { ! pendingNeighbors.add(node); ! } } ! void addUpNeighbor(AddressIF node) { ! if (node.equals(comm.getLocalAddress())) return; ! downNeighbors.remove(node); ! pendingNeighbors.remove(node); ! upNeighbors.add(node); } ! void addDownNeighbor(AddressIF node) { ! pendingNeighbors.remove(node); ! upNeighbors.remove(node); ! downNeighbors.add(node); ! } ! class CoordClient { ! final NCClient<AddressIF> nc; ! final CB0 updateCB; ! public CoordClient() { ! nc = new NCClient<AddressIF>(NCManager.NC_NUM_DIMS); ! updateCB = new CB0() { ! protected void cb(CBResult result) { ! update(); ! } ! }; } ! void registerTimer () { ! // LOWTODO adaptive delay ! double rnd = Pyxida.random.nextGaussian(); ! long delay = UPDATE_DELAY + (long)(1000 * rnd); ! ! log.debug ("setting timer to "+delay); ! EventLoop.get().registerTimerCB(delay, updateCB); ! } ! public void init() { ! registerTimer(); ! } ! void update() { ! registerTimer(); ! final AddressIF neighbor = pickGossipNode(); ! // send him a gossip msg ! // LOWTODO could bias which nodes are sent based on his coord ! GossipRequestMsg msg = new GossipRequestMsg(localNC.nc.getSystemCoords(), ! getUpNeighbors(null)); ! comm.sendRequestMessage(msg, neighbor, new ObjCommCB<GossipResponseMsg>() { ! protected void cb(CBResult result, final GossipResponseMsg responseMsg, ! AddressIF remoteAddr, Long ts, CB1<Boolean> cbHandled) { ! // LOWTODO can use time of this instead of ping time ! // if we want to not use jpcap pings ! // (running not at root) ! // ! // However however PRP not sure what the best way is to integrate this... ! // Ideally, all latency measurements should be done by the ping manager...? ! assert (cbHandled != null); ! switch (result.state) { ! case OK: { ! // keep track of new guys he's told us about ! addPendingNeighbors(responseMsg.nodes); ! // and ping him ! pingManager.addPingRequest(comm.getLocalAddress(), neighbor, new CB1<Float>() { ! protected void cb(CBResult pingResult, Float latency) { ! switch (pingResult.state) { ! case OK: { ! // both calls worked ! addUpNeighbor(neighbor); ! // MEDTODO convert nclib to use floats ! // ! // JTLTODO: why do you want to use floats and not double? won't we run into ! // accuracy problems? ! long lat_ms = (long) Math.round(latency); ! long curr_time = System.currentTimeMillis(); ! // and update our coordinate ! localNC.nc.processSample(neighbor, responseMsg.remoteCoordinate, ! responseMsg.remoteError, lat_ms, responseMsg.remoteAge, curr_time, true); ! log.debug("update: "+localNC.nc); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.warn("Ping to " + neighbor + " failed"); ! addDownNeighbor(neighbor); ! break; ! } ! } ! } ! }); ! break; ! } ! case ERROR: ! case TIMEOUT: { ! log.warn("Did not receive gossip response from " + neighbor); ! addDownNeighbor(neighbor); ! break; ! } ! } ! assert (cbHandled != null); ! cbHandled.call(CBResult.OK(), true); - } - }); } ! AddressIF pickGossipNode() { ! // LOWTODO ask our ncClient if it has a preferred gossip node ! ! // if not, use somebody from our neighbor set ! return getUpNeighbor(); ! } } ! class ProxyClient extends CoordClient { ! final AddressIF addr; ! long lease; ! void update() { ! registerTimer(); ! final AddressIF neighbor = pickGossipNode(); ! // ask him to ping our address ! // and tell us about him ! // update our coordinate ! } ! public ProxyClient(AddressIF _addr, long _lease) { ! super(); ! renewLease(_lease); ! // MEDTODO Does this need to be cloned? ! // prp: Don't know but if you want to clone it, use the copy constructor ! addr = AddressFactory.create(_addr); ! } ! public void renewLease(long _lease) { ! if (_lease <= 0) { ! log.warn("Changing given lease " + _lease + " to default lease " + DEFAULT_PROXY_LEASE); ! _lease = DEFAULT_PROXY_LEASE; ! } ! lease = System.currentTimeMillis() + _lease; ! if (lease < 0) ! lease = Long.MAX_VALUE; ! } ! public boolean hasExpired() { ! if (System.currentTimeMillis() > lease) { ! return true; ! } ! return false; ! } } } --- 168,429 ---- public void shutDown(DataOutputStream os) throws IOException { } ! */ ! abstract class ResponseObjCommCB<T extends ObjMessageIF> extends ObjCommCB<T> { ! void sendResponseMessage(final String handler, final AddressIF remoteAddr, ! final ObjMessage response, long requestMsgId, final String errorMessage, ! final CB1<Boolean> cbHandled) { ! ! CBResult result = null; ! if (errorMessage != null) { ! log.debug(handler + " :" + errorMessage); ! result = CBResult.ERROR(errorMessage); ! } else { ! result = CBResult.OK(); ! } ! comm.sendResponseMessage(response, remoteAddr, requestMsgId, new CB0() { ! protected void cb(CBResult sendResult) { ! switch (sendResult.state) { ! case TIMEOUT: ! case ERROR: { ! log.warn(handler + ": " + sendResult.what); ! return; } ! } } + }); + cbHandled.call(result, true); } + } ! class GossipHandler extends ResponseObjCommCB<GossipRequestMsg> { ! public void cb(CBResult result, GossipRequestMsg msg, AddressIF remoteAddr, Long ts, ! final CB1<Boolean> cbHandled) { ! log.debug("in GossipHandler cb: " + msg); ! // we just heard from him so we know he is up ! addUpNeighbor(remoteAddr); ! long curr_time = System.currentTimeMillis(); ! sendResponseMessage("Gossip", remoteAddr, ! new GossipResponseMsg(localNC.nc.getSystemCoords(), localNC.nc.getSystemError(), ! localNC.nc.getAge(curr_time), getUpNeighbors(msg.coordinate)), msg.getMsgId(), null, ! cbHandled); } + } ! AddressIF getUpNeighbor() { ! if (upNeighbors.size() == 0 && pendingNeighbors.size() == 0) { ! log.warn("we are lonely and have no one to gossip with"); ! return null; ! } ! final double pctUsePendingNeighbor = 0.1; ! AddressIF upNeighbor; ! if (upNeighbors.size() == 0 || ! Pyxida.random.nextDouble() < pctUsePendingNeighbor) { ! upNeighbor = PUtil.getRandomObject(pendingNeighbors); ! } else { ! upNeighbor = PUtil.getRandomObject(upNeighbors); } ! return upNeighbor; ! } ! Set<AddressIF> getUpNeighbors(Coordinate remoteCoord) { ! Set<AddressIF> nodes = new HashSet<AddressIF>(); ! // LOWTODO add option of loop here ! AddressIF node = getUpNeighbor(); ! if (node != null) { ! nodes.add(node); } + return nodes; + } ! void addPendingNeighbors(Set<AddressIF> nodes) { ! for (AddressIF node : nodes) { ! addPendingNeighbor(node); } + } ! // If this guy is in an unknown state ! // add him to pending. ! void addPendingNeighbor(AddressIF node) { ! if (node.equals(comm.getLocalAddress())) return; ! if (!pendingNeighbors.contains(node) && ! !upNeighbors.contains(node) && ! !downNeighbors.contains(node)) { ! pendingNeighbors.add(node); } + } ! void addUpNeighbor(AddressIF node) { ! if (node.equals(comm.getLocalAddress())) return; ! downNeighbors.remove(node); ! pendingNeighbors.remove(node); ! upNeighbors.add(node); ! } ! void addDownNeighbor(AddressIF node) { ! pendingNeighbors.remove(node); ! upNeighbors.remove(node); ! downNeighbors.add(node); ! } ! class CoordClient { ! final NCClient<AddressIF> nc; ! final CB0 updateCB; ! public CoordClient() { ! nc = new NCClient<AddressIF>(NCManager.NC_NUM_DIMS); + updateCB = new CB0() { + protected void cb(CBResult result) { + update(); } + }; ! } ! void registerTimer () { ! // LOWTODO adaptive delay ! double rnd = Pyxida.random.nextGaussian(); ! long delay = UPDATE_DELAY + (long)(1000 * rnd); ! log.debug ("setting timer to "+delay); ! EventLoop.get().registerTimerCB(delay, updateCB); ! } ! public void init() { ! registerTimer(); ! } ! void update() { ! registerTimer(); ! final AddressIF neighbor = pickGossipNode(); ! // send him a gossip msg ! // LOWTODO could bias which nodes are sent based on his coord ! GossipRequestMsg msg = new GossipRequestMsg(localNC.nc.getSystemCoords(), ! getUpNeighbors(null)); ! comm.sendRequestMessage(msg, neighbor, new ObjCommRRCB<GossipResponseMsg>() { ! protected void cb(CBResult result, final GossipResponseMsg responseMsg, ! AddressIF remoteAddr, Long ts) { ! // LOWTODO can use time of this instead of ping time ! // if we want to not use jpcap pings ! // (running not at root) ! // ! // However however PRP not sure what the best way is to integrate this... ! // Ideally, all latency measurements should be done by the ping manager...? ! switch (result.state) { ! case OK: { ! // keep track of new guys he's told us about ! addPendingNeighbors(responseMsg.nodes); ! // and ping him ! pingManager.addPingRequest(comm.getLocalAddress(), neighbor, new CB1<Float>() { ! protected void cb(CBResult pingResult, Float latency) { ! switch (pingResult.state) { ! case OK: { ! // both calls worked ! addUpNeighbor(neighbor); ! // MEDTODO convert nclib to use floats ! // ! // JTLTODO: why do you want to use floats and not double? won't we run into ! // accuracy problems? ! long lat_ms = (long) Math.round(latency); ! long curr_time = System.currentTimeMillis(); ! // and update our coordinate ! localNC.nc.processSample(neighbor, responseMsg.remoteCoordinate, ! responseMsg.remoteError, lat_ms, responseMsg.remoteAge, curr_time, true); ! log.debug("update: "+localNC.nc); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! log.warn("Ping to " + neighbor + " failed"); ! addDownNeighbor(neighbor); ! break; ! } ! } ! } ! }); ! break; ! } ! case ERROR: ! case TIMEOUT: { ! log.warn("Did not receive gossip response from " + neighbor); ! addDownNeighbor(neighbor); ! break; ! } ! } } + }); + } ! AddressIF pickGossipNode() { ! // LOWTODO ask our ncClient if it has a preferred gossip node ! // if not, use somebody from our neighbor set ! return getUpNeighbor(); } ! } ! class ProxyClient extends CoordClient { ! final AddressIF addr; ! long lease; ! void update() { ! registerTimer(); ! final AddressIF neighbor = pickGossipNode(); ! // ask him to ping our address ! // and tell us about him ! // update our coordinate ! } ! public ProxyClient(AddressIF _addr, long _lease) { ! super(); ! renewLease(_lease); ! // MEDTODO Does this need to be cloned? ! // prp: Don't know but if you want to clone it, use the copy constructor ! addr = AddressFactory.create(_addr); ! } ! public void renewLease(long _lease) { ! if (_lease <= 0) { ! log.warn("Changing given lease " + _lease + " to default lease " + DEFAULT_PROXY_LEASE); ! _lease = DEFAULT_PROXY_LEASE; ! } ! lease = System.currentTimeMillis() + _lease; ! if (lease < 0) ! lease = Long.MAX_VALUE; } + public boolean hasExpired() { + if (System.currentTimeMillis() > lease) { + return true; + } + return false; + } + } + } |