|
From: Peter P. <pr...@us...> - 2006-12-07 11:46:19
|
Update of /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/ping In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv27417/src/edu/harvard/syrah/pyxida/ping Modified Files: TCPSynPinger.java ICMPPinger.java JpcapPinger.java Log Message: Redesigned the Jpcap code to handle multiple concurrent pings Index: ICMPPinger.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/ping/ICMPPinger.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** ICMPPinger.java 1 Dec 2006 18:10:54 -0000 1.10 --- ICMPPinger.java 7 Dec 2006 11:46:17 -0000 1.11 *************** *** 24,55 **** public void ping(AddressIF remoteNode, final CB1<Double> cbPing) throws UnsupportedOperationException { log.debug("Sending new ping to remoteNode=" + remoteNode + " using ICMP"); - final JpcapPingData pd = new JpcapPingData(); - pd.pingAddr = remoteNode; - pd.packetType = ICMPPacket.class; ! pd.cbDone = new CB0(PING_TIMEOUT) { ! protected void cb(CBResult result) { ! switch (result.state) { ! case OK : { ! log.debug("pd.recvPacket=" + pd.recvPacket); ! pd.recvTS = parseICMP(pd.recvPacket); ! assert pd.sendTS > 0; ! assert pd.recvTS > pd.sendTS : "send TS > recv TS ?"; ! long lat = pd.recvTS - pd.sendTS; ! log.debug("lat=" + lat + " pd.recvTS=" + pd.recvTS); ! cbPing.call(result, lat / 1000.0); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! removeRequest(); ! cbPing.call(result, 0.0); ! break; ! } ! } ! } ! }; ! ! addRequest(pd); pd.sendPacket = createICMP(pd.pingAddr); --- 24,30 ---- public void ping(AddressIF remoteNode, final CB1<Double> cbPing) throws UnsupportedOperationException { log.debug("Sending new ping to remoteNode=" + remoteNode + " using ICMP"); ! JpcapPingData pd = addJpcapRequest(remoteNode, cbPing); ! pd.packetType = ICMPPacket.class; pd.sendPacket = createICMP(pd.pingAddr); *************** *** 91,95 **** } ! private long parseICMP(Packet p) { log.debug("p=" + p + " p.class=" + (p != null ? p.getClass() : null)); --- 66,70 ---- } ! protected long parsePacket(Packet p) { log.debug("p=" + p + " p.class=" + (p != null ? p.getClass() : null)); Index: TCPSynPinger.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/ping/TCPSynPinger.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** TCPSynPinger.java 24 Nov 2006 13:58:57 -0000 1.7 --- TCPSynPinger.java 7 Dec 2006 11:46:17 -0000 1.8 *************** *** 1,8 **** package edu.harvard.syrah.pyxida.ping; ! import jpcap.packet.EthernetPacket; ! import jpcap.packet.IPPacket; ! import jpcap.packet.Packet; ! import jpcap.packet.TCPPacket; import edu.harvard.syrah.prp.ANSI; import edu.harvard.syrah.prp.Log; --- 1,5 ---- package edu.harvard.syrah.pyxida.ping; ! import jpcap.packet.*; import edu.harvard.syrah.prp.ANSI; import edu.harvard.syrah.prp.Log; *************** *** 27,60 **** public void ping(AddressIF remoteNode, final CB1<Double> cbPing) throws UnsupportedOperationException { log.debug("Sending new ping to remoteNode=" + remoteNode + ":" + PING_DST_PORT + " using TCPSyn"); ! final JpcapPingData pd = new JpcapPingData(); ! pd.pingAddr = remoteNode; ! ! pd.cbDone = new CB0(PING_TIMEOUT) { ! protected void cb(CBResult result) { ! switch (result.state) { ! case OK : { ! log.debug("pd.recvPacket=" + pd.recvPacket); ! processTCP(pd); ! assert pd.sendTS > 0; ! assert pd.recvTS > pd.sendTS : "send TS (" + pd.sendTS + ") > recv TS (" + pd.recvTS + ")?"; ! long lat = pd.recvTS - pd.sendTS; ! log.debug("lat=" + lat + " pd.recvTS=" + pd.recvTS); ! ! removeRequest(); ! cbPing.call(result, lat / 1000.0); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! removeRequest(); ! cbPing.call(result, 0.0); ! break; ! } ! } ! } ! }; ! ! addRequest(pd); sendTCPSyn(pd); } --- 24,34 ---- public void ping(AddressIF remoteNode, final CB1<Double> cbPing) throws UnsupportedOperationException { log.debug("Sending new ping to remoteNode=" + remoteNode + ":" + PING_DST_PORT + " using TCPSyn"); ! JpcapPingData pd = addJpcapRequest(remoteNode, cbPing); ! pd.packetType = TCPPacket.class; sendTCPSyn(pd); + + /* + * We don't need to send a rst here because the kernel does it for us. + */ } *************** *** 102,119 **** */ ! private void processTCP(JpcapPingData pd) { ! Packet p = pd.recvPacket; log.debug("p=" + p + " p.class=" + (p != null ? p.getClass() : null)); TCPPacket tcp = (TCPPacket) p; long sec = p.sec; long usec = p.usec; ! pd.recvTS = (sec * 1000 * 1000) + usec; ! log.debug("recvTS=" + pd.recvTS); ! ! /* ! * We don't need to send a rst here because the kernel does it for us. ! */ } --- 76,92 ---- */ ! protected long parsePacket(Packet p) { log.debug("p=" + p + " p.class=" + (p != null ? p.getClass() : null)); + long recvTS = Long.MIN_VALUE; TCPPacket tcp = (TCPPacket) p; long sec = p.sec; long usec = p.usec; ! log.debug("ICMP_ECHOREPLY: " + tcp.src_ip + " sec=" + sec + " usec=" + usec); ! recvTS = (sec * 1000 * 1000) + usec; ! log.debug("recvTS=" + recvTS); ! ! return recvTS; } Index: JpcapPinger.java =================================================================== RCS file: /cvsroot/pyxida/Pyxida/src/edu/harvard/syrah/pyxida/ping/JpcapPinger.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** JpcapPinger.java 1 Dec 2006 18:10:54 -0000 1.5 --- JpcapPinger.java 7 Dec 2006 11:46:17 -0000 1.6 *************** *** 6,10 **** import java.net.InetAddress; import java.net.MalformedURLException; ! import java.util.Arrays; import jpcap.JpcapCaptor; --- 6,10 ---- import java.net.InetAddress; import java.net.MalformedURLException; ! import java.util.*; import jpcap.JpcapCaptor; *************** *** 17,22 **** --- 17,24 ---- import edu.harvard.syrah.prp.Log; import edu.harvard.syrah.prp.POut; + import edu.harvard.syrah.sbon.async.CBResult; import edu.harvard.syrah.sbon.async.EventLoop; import edu.harvard.syrah.sbon.async.CallbacksIF.CB0; + import edu.harvard.syrah.sbon.async.CallbacksIF.CB1; import edu.harvard.syrah.sbon.comm.AddressFactory; import edu.harvard.syrah.sbon.comm.AddressIF; *************** *** 56,60 **** } ! private static JpcapPingData currentPing; public void init(AddressIF defaultPingAddr, final CB0 cbDone) { --- 58,62 ---- } ! private static List<JpcapPingData> currentPingList = Collections.synchronizedList(new ArrayList<JpcapPingData>()); public void init(AddressIF defaultPingAddr, final CB0 cbDone) { *************** *** 170,180 **** protected void addRequest(JpcapPingData pd) { - // Assume that there's not other ping that is waiting for a response - assert JpcapPinger.currentPing == null; - JpcapPinger.currentPing = pd; } ! protected void removeRequest() { ! JpcapPinger.currentPing = null; } --- 172,180 ---- protected void addRequest(JpcapPingData pd) { } ! protected void removeRequest(JpcapPingData pd) { ! boolean found = currentPingList.remove(pd); ! assert found : "PingData pd=" + pd + " not found in list: " + currentPingList; } *************** *** 183,204 **** //captor.setFilter("icmp", true); ! while (!jpcapThread.isInterrupted()) { ! IPPacket ip = null; ! do { ! /* ! * TODO is this a busy wait? ! */ ! log.debug("Waiting for a packet..."); ! ip = (IPPacket) captor.getPacket(); ! log.debug("Captured a packet: ip=" + ip + " src=" + ip.src_ip); ! } while (currentPing == null || ip.getClass() != currentPing.packetType || !ip.src_ip.equals(currentPing.pingAddr.getInetAddress())); ! ! currentPing.recvPacket = ip; ! ! log.debug("Received packet: " + currentPing.recvPacket); ! EventLoop.get().registerTimerCB(currentPing.cbDone); ! currentPing = null; } } } --- 183,243 ---- //captor.setFilter("icmp", true); ! while (!jpcapThread.isInterrupted()) { ! IPPacket ip = null; ! while(true) { ! /* ! * TODO is this a busy wait? ! */ ! log.debug("Waiting for a packet..."); ! ip = (IPPacket) captor.getPacket(); ! log.debug("Captured a packet from src=" + ip.src_ip); ! ! synchronized(currentPingList) { ! for (Iterator<JpcapPingData> it = currentPingList.iterator(); it.hasNext();) { ! JpcapPingData currentPing = it.next(); ! if (ip.getClass() == currentPing.packetType && ip.src_ip.equals(currentPing.pingAddr.getInetAddress())) { ! it.remove(); ! currentPing.recvPacket = ip; ! log.debug("Found a matching ping request"); ! EventLoop.get().registerTimerCB(currentPing.cbDone); ! } ! } ! } ! } ! } ! } ! ! public JpcapPingData addJpcapRequest(AddressIF remoteNode, final CB1<Double> cbPing) { ! final JpcapPingData pd = new JpcapPingData(); ! pd.pingAddr = remoteNode; ! ! pd.cbDone = new CB0(PING_TIMEOUT) { ! protected void cb(CBResult result) { ! switch (result.state) { ! case OK : { ! log.debug("pd.recvPacket=" + pd.recvPacket); ! pd.recvTS = parsePacket(pd.recvPacket); ! assert pd.sendTS > 0; ! assert pd.recvTS > pd.sendTS : "send TS > recv TS ?"; ! long lat = pd.recvTS - pd.sendTS; ! log.debug("lat=" + lat + " pd.recvTS=" + pd.recvTS); ! cbPing.call(result, lat / 1000.0); ! break; ! } ! case TIMEOUT: ! case ERROR: { ! removeRequest(pd); ! cbPing.call(result, 0.0); ! break; ! } } + } + }; + + currentPingList.add(pd); + return pd; } + protected abstract long parsePacket(Packet p); + } |