Thread: [Udt-java-commits] SF.net SVN: udt-java:[23] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-22 10:07:00
|
Revision: 23 http://udt-java.svn.sourceforge.net/udt-java/?rev=23&view=rev Author: bschuller Date: 2010-04-22 10:06:54 +0000 (Thu, 22 Apr 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/README udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTPacket.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/packets/ControlPacket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/TestList.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/README =================================================================== --- udt-java/trunk/README 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/README 2010-04-22 10:06:54 UTC (rev 23) @@ -17,7 +17,7 @@ To download a file from the server, - bin/receive_file <server_host> <server_port> <remote_filename> <local_filename> + bin/receive-file <server_host> <server_port> <remote_filename> <local_filename> # Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-22 10:06:54 UTC (rev 23) @@ -21,7 +21,7 @@ private final UDTStatistics statistics; //round trip time in microseconds - private long roundTripTime=2*Util.getSYNTime(); + private long roundTripTime=0; //rate in packets per second private long packetArrivalRate=0; @@ -124,7 +124,6 @@ if(slowStartPhase){ congestionWindowSize+=ackSeqno-lastAckSeqNumber; lastAckSeqNumber = ackSeqno; - //but not beyond a maximum size if(congestionWindowSize>session.getFlowWindowSize()){ System.out.println("slow start ends on ACK"); @@ -163,7 +162,6 @@ double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); packetSendingPeriod=factor*packetSendingPeriod; //packetSendingPeriod=0.995*packetSendingPeriod; - //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod); statistics.setSendPeriod(packetSendingPeriod); } @@ -173,7 +171,7 @@ //see spec page 16 private double computeNumOfIncreasingPacket (){ - //difference in link capacity and sending speed, in packets per second + //difference between link capacity and sending speed, in packets per second double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod; if(remaining<=0){ Modified: udt-java/trunk/src/main/java/udt/UDTPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-22 10:06:54 UTC (rev 23) @@ -32,7 +32,7 @@ package udt; -public interface UDTPacket { +public interface UDTPacket extends Comparable<UDTPacket>{ public long getMessageNumber(); @@ -67,4 +67,7 @@ public boolean isConnectionHandshake(); public UDTSession getSession(); + + public long getPacketSequenceNumber(); + } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-22 10:06:54 UTC (rev 23) @@ -33,8 +33,8 @@ package udt; import java.io.IOException; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -110,7 +110,7 @@ long packetArrivalSpeed; //round trip time, calculated from ACK/ACK2 pairs - long roundTripTime=50*1000; + long roundTripTime=0; //round trip time variance long roundTripTimeVar=roundTripTime/2; @@ -135,8 +135,8 @@ //buffer size for storing data private final long bufferSize; - //stores packets to be sent - private final BlockingQueue<UDTPacket>handoffQueue=new ArrayBlockingQueue<UDTPacket>(32); + //stores received packets to be sent + private final BlockingQueue<UDTPacket>handoffQueue; private Thread receiverThread; @@ -162,10 +162,14 @@ packetHistoryWindow = new PacketHistoryWindow(16); receiverLossList = new ReceiverLossList(); packetPairWindow = new PacketPairWindow(16); - nextACK=Util.getCurrentTime()+ACK_INTERVAL; - nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL); - nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL; + largestReceivedSeqNumber=session.getInitialSequenceNumber()-1; bufferSize=session.getReceiveBufferSize(); + + //incoming packets are ordered by sequence number, with control packets having + //preference over data packets + handoffQueue=//new ArrayBlockingQueue<UDTPacket>(session.getFlowWindowSize()); + new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); + start(); } @@ -174,6 +178,9 @@ Runnable r=new Runnable(){ public void run(){ try{ + nextACK=Util.getCurrentTime()+ACK_INTERVAL; + nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL); + nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL; while(!stopped){ receiverAlgorithm(); } @@ -191,6 +198,8 @@ /* * packets are written by the endpoint */ + long i=0; + long mean=0; protected void receive(UDTPacket p)throws IOException{ handoffQueue.offer(p); } @@ -416,7 +425,6 @@ protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ if(sequenceNumbers.size()==0)return; NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement(); - nAckPacket.addLossInfo(sequenceNumbers); nAckPacket.setSession(session); nAckPacket.setDestinationID(session.getDestination().getSocketID()); @@ -486,7 +494,6 @@ if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8; else roundTripTime = rtt; roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4; - ACK_INTERVAL=4*roundTripTime+roundTripTimeVar+Util.getSYNTime(); NAK_INTERVAL=ACK_INTERVAL; statistics.setRTT(roundTripTime, roundTripTimeVar); Modified: udt-java/trunk/src/main/java/udt/packets/ControlPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-22 10:06:54 UTC (rev 23) @@ -191,6 +191,14 @@ this.session = session; } + public long getPacketSequenceNumber(){ + return -1; + } + + public int compareTo(UDTPacket other){ + return (int)(getPacketSequenceNumber()-other.getPacketSequenceNumber()); + } + public static enum ControlPacketType { CONNECTION_HANDSHAKE, Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-22 10:06:54 UTC (rev 23) @@ -38,7 +38,7 @@ import udt.UDTPacket; import udt.UDTSession; -public class DataPacket implements UDTPacket, Comparable<DataPacket>{ +public class DataPacket implements UDTPacket, Comparable<UDTPacket>{ private byte[] data ; private long packetSequenceNumber; @@ -178,9 +178,7 @@ this.session = session; } - //Compare data packets by their sequence number - public int compareTo(DataPacket other){ - return (int)(other.packetSequenceNumber-packetSequenceNumber); + public int compareTo(UDTPacket other){ + return (int)(getPacketSequenceNumber()-other.getPacketSequenceNumber()); } - } Modified: udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java 2010-04-22 10:06:54 UTC (rev 23) @@ -36,8 +36,6 @@ /** * a circular array of each sent Ack and the time it is sent out - * - * */ public class AckHistoryWindow extends CircularArray<AckHistoryEntry>{ @@ -47,20 +45,20 @@ /** * return the time for the given seq no, or <code>-1 </code> if not known - * @param seqNo + * @param ackNumber */ - public long getTime(long seqNo){ + public long getTime(long ackNumber){ for(AckHistoryEntry obj: circularArray){ - if(obj.getAckNumber()==seqNo){ + if(obj.getAckNumber()==ackNumber){ return obj.getSentTime(); } } return -1; } - public AckHistoryEntry getEntry(long seqNo){ + public AckHistoryEntry getEntry(long ackNumber){ for(AckHistoryEntry obj: circularArray){ - if(obj.getAckNumber()==seqNo){ + if(obj.getAckNumber()==ackNumber){ return obj; } } Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-22 10:06:54 UTC (rev 23) @@ -49,7 +49,7 @@ private final PriorityBlockingQueue<ReceiverLossListEntry>backingList; public ReceiverLossList(){ - backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(16); + backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(32); } public void insert(ReceiverLossListEntry entry){ @@ -94,11 +94,10 @@ */ public List<Long>getFilteredSequenceNumbers(long RTT, boolean doFeedback){ List<Long>result=new ArrayList<Long>(); - long now=Util.getCurrentTime(); ReceiverLossListEntry[]sorted=backingList.toArray(new ReceiverLossListEntry[0]); Arrays.sort(sorted); for(ReceiverLossListEntry e: sorted){ - if( (now-e.getLastFeedbackTime())>e.getK()*RTT){ + if( (Util.getCurrentTime()-e.getLastFeedbackTime())>e.getK()*RTT){ result.add(e.getSequenceNumber()); if(doFeedback)e.feedback(); } Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-22 10:06:54 UTC (rev 23) @@ -39,7 +39,7 @@ */ public class ReceiverLossListEntry implements Comparable<ReceiverLossListEntry> { - private final long sequenceNumber ; + private final long sequenceNumber; private long lastFeedbacktime; private long k = 2; Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-22 10:06:54 UTC (rev 23) @@ -44,7 +44,7 @@ * create a new sender lost list */ public SenderLossList(){ - backingList = new PriorityBlockingQueue<SenderLossListEntry>(16); + backingList = new PriorityBlockingQueue<SenderLossListEntry>(32); } public void insert(SenderLossListEntry obj){ Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-22 10:06:54 UTC (rev 23) @@ -36,6 +36,7 @@ import java.io.FileInputStream; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.text.NumberFormat; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -60,9 +61,10 @@ //TODO configure pool size private final ExecutorService threadPool=Executors.newFixedThreadPool(3); - + public SendFile(int serverPort){ this.serverPort=serverPort; + } @Override @@ -115,8 +117,11 @@ private final UDTSocket socket; + private final NumberFormat format=NumberFormat.getNumberInstance(); + public RequestRunner(UDTSocket socket){ this.socket=socket; + format.setMaximumFractionDigits(3); } public void run(){ @@ -149,7 +154,8 @@ Util.copy(fis, out, size, true); long end=System.currentTimeMillis(); System.out.println(socket.getSession().getStatistics().toString()); - System.out.println("[SendFile] Rate: "+1000*size/1024/1024/(end-start)+" MBytes/sec."); + double rate=1000.0*size/1024/1024/(end-start); + System.out.println("[SendFile] Rate: "+format.format(rate)+" MBytes/sec. "+format.format(8*rate)+" MBit/sec."); socket.getSession().getStatistics().writeParameterHistory(new File("udtstats-"+System.currentTimeMillis()+".csv")); }finally{ fis.close(); Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-22 10:06:54 UTC (rev 23) @@ -230,5 +230,5 @@ } return hexString.toString(); } - + } Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-22 10:06:54 UTC (rev 23) @@ -149,5 +149,5 @@ p.setPort(clientPort); endpoint.sendRaw(p); } - + } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/test/java/udt/TestList.java 2010-04-22 10:06:54 UTC (rev 23) @@ -1,5 +1,10 @@ package udt; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + import junit.framework.TestCase; +import udt.packets.DataPacket; +import udt.packets.KeepAlive; import udt.receiver.AckHistoryEntry; import udt.receiver.AckHistoryWindow; import udt.receiver.PacketHistoryWindow; @@ -58,33 +63,23 @@ for(int i=0;i<values.length;i++){ p.add(values[i]); } - //assertEquals(10.0d, p.computeMedianTimeInterval()); + assertEquals(4.0d, p.computeMedianTimeInterval()); - System.out.println(p.toString()); - System.out.println("MedianTimeInterval: "+p.computeMedianTimeInterval()); + long[] arrivaltimes = {12, 12, 12, 12}; + PacketPairWindow p1=new PacketPairWindow(16); + for(int i=0;i<values.length;i++){ + p1.add(arrivaltimes[i]); + } + assertEquals(12.0d, p1.computeMedianTimeInterval()); - System.out.println(p.toString()); - System.out.println("MedianTimeInterval: "+p.computeMedianTimeInterval()); - - //assertEquals(10.0d, p.); - - //long[] arrivaltimes = {12, 12, 12, 12}; - //PacketPairWindow p1=new PacketPairWindow(16); - //for(int i=0;i<values.length;i++){ - // p1.insert(arrivaltimes[i]); - //} - //assertEquals(12.0d, p1.computeMedianTimeInterval()); - } public void testAckHistoryWindow(){ - AckHistoryEntry ackSeqNrA = new AckHistoryEntry( 0,1,1263465050); - + AckHistoryEntry ackSeqNrA = new AckHistoryEntry(0,1,1263465050); AckHistoryEntry ackSeqNrB = new AckHistoryEntry(1,2,1263465054); - AckHistoryEntry ackSeqNrC = new AckHistoryEntry(2,3,1263465058); AckHistoryWindow recvWindow = new AckHistoryWindow(3); @@ -92,10 +87,7 @@ recvWindow.add(ackSeqNrB); recvWindow.add(ackSeqNrC); AckHistoryEntry entryA = recvWindow.getEntry(1); - long storageTimeA = entryA.getSentTime(); - long storageTimeA_ =recvWindow.getTime(1); - System.out.println("storageTimeA bzw A_ "+storageTimeA+" "+storageTimeA_); - + assertEquals(1263465050, entryA.getSentTime()); } public void testSenderLossList1(){ @@ -110,4 +102,30 @@ assertEquals(C,oldest); } + public void testReceiverInputQueue(){ + BlockingQueue<UDTPacket> q=new PriorityBlockingQueue<UDTPacket>(5); + UDTPacket control = new KeepAlive(); + DataPacket d1=new DataPacket(); + d1.setPacketSequenceNumber(1); + DataPacket d2=new DataPacket(); + d2.setPacketSequenceNumber(2); + q.offer(d2); + q.offer(d1); + q.offer(control); + + UDTPacket p1=q.poll(); + assertTrue(p1.isControlPacket()); + + UDTPacket p2=q.poll(); + assertFalse(p2.isControlPacket()); + //check ordering by sequence number + assertEquals(1,p2.getPacketSequenceNumber()); + + UDTPacket p3=q.poll(); + assertFalse(p3.isControlPacket()); + assertEquals(2,p3.getPacketSequenceNumber()); + + + } + } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-22 10:06:54 UTC (rev 23) @@ -3,6 +3,7 @@ import java.io.File; import java.net.InetAddress; import java.security.MessageDigest; +import java.text.NumberFormat; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; @@ -20,7 +21,7 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=200; //how large is a single packet int size=1*1024*1024; @@ -38,7 +39,11 @@ doTest(); } + private final NumberFormat format=NumberFormat.getNumberInstance(); + protected void doTest()throws Exception{ + format.setMaximumFractionDigits(2); + if(!running)runServer(); UDTClient client=new UDTClient(InetAddress.getByName("localhost"),12345); client.connect("localhost", 65321); @@ -58,7 +63,8 @@ long block=System.currentTimeMillis(); client.sendBlocking(data); digest.update(data); - System.out.println("Sent block <"+i+"> in "+(System.currentTimeMillis()-block)+" ms"); + double took=System.currentTimeMillis()-block; + System.out.println("Sent block <"+i+"> in "+took+" ms, rate: "+format.format(size/(1024*took))+ " Mbytes/sec"); } end=System.currentTimeMillis(); client.shutdown(); @@ -70,7 +76,7 @@ System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); double mbytes=N/(end-start)/1024; double mbit=8*mbytes; - System.out.println("Rate: "+(int)mbytes+" Mbytes/sec "+(int)mbit+" Mbit/sec"); + System.out.println("Rate: "+format.format(mbytes)+" Mbytes/sec "+format.format(mbit)+" Mbit/sec"); System.out.println("Server received: "+total); assertEquals(N,total); @@ -78,7 +84,7 @@ System.out.println("MD5 hash of data received: "+md5_received); System.out.println(client.getStatistics()); - //assertEquals(md5_sent,md5_received); + assertEquals(md5_sent,md5_received); //store stat history to csv file client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv")); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-04-28 14:52:04
|
Revision: 28 http://udt-java.svn.sourceforge.net/udt-java/?rev=28&view=rev Author: bschuller Date: 2010-04-28 14:51:57 +0000 (Wed, 28 Apr 2010) Log Message: ----------- some cleanup Modified Paths: -------------- udt-java/trunk/pom.xml udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/Destination.java udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java udt-java/trunk/src/main/java/udt/util/MeanValue.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/TestList.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Removed Paths: ------------- udt-java/trunk/src/main/java/udt/util/FlowWindow.java Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/pom.xml 2010-04-28 14:51:57 UTC (rev 28) @@ -5,7 +5,7 @@ <artifactId>udt-java</artifactId> <packaging>jar</packaging> <name>UDT Java implementation</name> - <version>0.2-SNAPSHOT</version> + <version>0.4-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> <developers> <developer> Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-28 14:51:57 UTC (rev 28) @@ -50,6 +50,7 @@ import udt.packets.ConnectionHandshake; import udt.packets.Destination; import udt.packets.PacketFactory; +import udt.util.MeanValue; import udt.util.UDTThreadFactory; /** @@ -82,7 +83,7 @@ private volatile boolean stopped=false; public static final int DATAGRAM_SIZE=1500; - + /** * bind to any local port on the given host address * @param localAddress @@ -113,6 +114,8 @@ sessionHandoff=new SynchronousQueue<UDTSession>(); //set a time out to avoid blocking in doReceive() dgSocket.setSoTimeout(1000); + //buffer size + dgSocket.setReceiveBufferSize(512*1024); } /** @@ -237,6 +240,7 @@ */ private long lastDestID=-1; private UDTSession lastSession; + MeanValue v=new MeanValue(true,64); protected void doReceive()throws IOException{ try{ try{ @@ -294,12 +298,11 @@ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } } - + protected void doSend(UDTPacket packet)throws IOException{ byte[]data=packet.getEncoded(); - Destination dest=packet.getSession().getDestination(); - DatagramPacket dgp = new DatagramPacket(data, data.length, - dest.getAddress() , dest.getPort()); + DatagramPacket dgp = packet.getSession().getDatagram(); + dgp.setData(data); dgSocket.send(dgp); } Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-28 14:51:57 UTC (rev 28) @@ -30,7 +30,7 @@ private long estimatedLinkCapacity=0; // Packet sending period = packet send interval, in microseconds - private double packetSendingPeriod=1; + private double packetSendingPeriod=0; // Congestion window size, in packets private long congestionWindowSize=16; @@ -228,7 +228,10 @@ // c. Record the current largest sent sequence number (LastDecSeq). lastDecreaseSeqNo= currentMaxSequenceNumber; } - + + //enforce upper limit on send period... + //packetSendingPeriod=Math.min(packetSendingPeriod, 2*roundTripTime); + statistics.setSendPeriod(packetSendingPeriod); return; } Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-28 14:51:57 UTC (rev 28) @@ -34,10 +34,10 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import udt.util.FlowWindow; import udt.util.UDTStatistics; /** @@ -54,7 +54,7 @@ //inbound application data, in-order, and ready for reading //by the application - private final FlowWindow<AppData>appData; + private final PriorityBlockingQueue<AppData>appData; private final UDTStatistics statistics; @@ -78,13 +78,9 @@ public UDTInputStream(UDTSocket socket, UDTStatistics statistics)throws IOException{ this.socket=socket; this.statistics=statistics; - appData=new FlowWindow<AppData>(getFlowWindowSize()); + appData=new PriorityBlockingQueue<AppData>(128); } - private int getFlowWindowSize(){ - if(socket!=null)return 2*socket.getSession().getFlowWindowSize(); - else return 128; - } /** * create a new {@link UDTInputStream} connected to the given socket * @param socket - the {@link UDTSocket} @@ -172,6 +168,7 @@ } } else currentChunk=appData.poll(10, TimeUnit.MILLISECONDS); + }catch(InterruptedException ie){ IOException ex=new IOException(); ex.initCause(ie); Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-28 14:51:57 UTC (rev 28) @@ -167,9 +167,7 @@ //incoming packets are ordered by sequence number, with control packets having //preference over data packets - handoffQueue=//new ArrayBlockingQueue<UDTPacket>(session.getFlowWindowSize()); - new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); - + handoffQueue=new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); start(); } @@ -198,8 +196,6 @@ /* * packets are written by the endpoint */ - long i=0; - long mean=0; protected void receive(UDTPacket p)throws IOException{ handoffQueue.offer(p); } @@ -247,6 +243,7 @@ } processUDTPacket(packet); } + Thread.yield(); } @@ -326,6 +323,7 @@ protected void processUDTPacket(UDTPacket p)throws IOException{ //(3).Check the packet type and process it according to this. + if(p instanceof DataPacket){ DataPacket dp=(DataPacket)p; onDataPacketReceived(dp); @@ -340,8 +338,6 @@ onShutdown(); } - //other packet types? - } //every nth packet will be discarded... for testing only of course @@ -375,11 +371,8 @@ //store current time lastDataPacketArrivalTime=currentDataPacketArrivalTime; - if(!session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData())){ - //no left space in application data buffer->drop this packet - return; - } - + session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); + //(6).number of detected lossed packet /*(6.a).if the number of the current data packet is greater than LSRN+1, put all the sequence numbers between (but excluding) these two values @@ -387,13 +380,11 @@ if(currentSequenceNumber>largestReceivedSeqNumber+1){ sendNAK(currentSequenceNumber); } - else{ - if(currentSequenceNumber<largestReceivedSeqNumber){ + else if(currentSequenceNumber<largestReceivedSeqNumber){ /*(6.b).if the sequence number is less than LRSN,remove it from * the receiver's loss list */ receiverLossList.remove(currentSequenceNumber); - } } statistics.incNumberOfReceivedDataPackets(); @@ -422,6 +413,7 @@ receiverLossList.insert(detectedLossSeqNumber); } endpoint.doSend(nAckPacket); + statistics.incNumberOfNAKSent(); } protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ @@ -447,7 +439,7 @@ estimateLinkCapacity=packetPairWindow.getEstimatedLinkCapacity(); acknowledgmentPkt.setEstimatedLinkCapacity(estimateLinkCapacity); //set the packet arrival rate - packetArrivalSpeed=(long)packetHistoryWindow.getPacketArrivalSpeed(); + packetArrivalSpeed=packetHistoryWindow.getPacketArrivalSpeed(); acknowledgmentPkt.setPacketReceiveRate(packetArrivalSpeed); endpoint.doSend(acknowledgmentPkt); Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-28 14:51:57 UTC (rev 28) @@ -96,36 +96,46 @@ //last acknowledge number, initialised to the initial sequence number private long lastAckSequenceNumber; - //size of the send queue - public final int sendQueueLength; + private volatile boolean started=false; private volatile boolean stopped=false; - private volatile AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final CountDownLatch startLatch=new CountDownLatch(1); - private volatile AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + public UDTSender(UDTSession session,UDPEndPoint endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; - statistics=session.getStatistics(); - sendQueueLength=64;//session.getFlowWindowSize(); senderLossList=new SenderLossList(); - sendBuffer=new ConcurrentHashMap<Long, DataPacket>(sendQueueLength,0.75f,2); - sendQueue = new LinkedBlockingQueue<DataPacket>(sendQueueLength); + sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2); + sendQueue = new LinkedBlockingQueue<DataPacket>(1000); lastAckSequenceNumber=session.getInitialSequenceNumber(); waitForAckLatch.set(new CountDownLatch(1)); waitForSeqAckLatch.set(new CountDownLatch(1)); - start(); + doStart(); } + /** + * start the sender thread + */ + public void start(){ + logger.info("Starting sender for "+session); + startLatch.countDown(); + started=true; + } + //starts the sender algorithm - private void start(){ + private void doStart(){ Runnable r=new Runnable(){ public void run(){ try{ + //wait until explicitely started + startLatch.await(); while(!stopped){ senderAlgorithm(); } @@ -150,7 +160,7 @@ * @param data * @throws IOException * @throws InterruptedException - */ + */ private void send(DataPacket p)throws IOException{ synchronized(sendLock){ endpoint.doSend(p); @@ -161,15 +171,6 @@ } /** - * writes a data packet into the sendQueue - * @return <code>true</code>if the packet was added, <code>false</code> if the - * packet could not be added because the queue was full - */ - protected boolean sendUdtPacket(DataPacket p)throws IOException{ - return sendQueue.offer(p); - } - - /** * writes a data packet into the sendQueue, waiting at most for the specified time * if this is not possible due to a full send queue * @@ -183,6 +184,7 @@ * @throws InterruptedException */ protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{ + if(!started)start(); return sendQueue.offer(p,timeout,units); } @@ -244,7 +246,7 @@ */ protected void onNAKPacketReceived(NegativeAcknowledgement nak){ waitForAckLatch.get().countDown(); - + for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } @@ -252,12 +254,12 @@ session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); statistics.storeParameters(); - + if(logger.isLoggable(Level.FINER)){ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " - +"set send period to "+session.getCongestionControl().getSendInterval()); + +"set send period to "+session.getCongestionControl().getSendInterval()); } - + return; } @@ -282,56 +284,45 @@ */ public void senderAlgorithm()throws InterruptedException, IOException{ long iterationStart=Util.getCurrentTime(); + //if the sender's loss list is not empty SenderLossListEntry entry=senderLossList.getFirstEntry(); if (entry!=null) { - long seqNumber = entry.getSequenceNumber(); - //TODO - //if the current seqNumber is 16n,check the timeOut in the - //loss list and send a message drop request. - //if((seqNumber%16)==0){ - //sendLossList.checkTimeOut(timeToLive); - //} - try { - //retransmit the packet with the first entry in the list - //as sequence number and remove it from the list - DataPacket pktToRetransmit = sendBuffer.get(seqNumber); - if(pktToRetransmit!=null){ - endpoint.doSend(pktToRetransmit); - statistics.incNumberOfRetransmittedDataPackets(); - } - }catch (Exception e) { - logger.log(Level.WARNING,"",e); - } - // return; + handleResubmit(entry); } - - //if the number of unacknowledged data packets does not exceed the congestion - //and the flow window sizes, pack a new packet - int unAcknowledged=unacknowledged.get(); - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ - - if(sendQueue.size()==0){ - //Thread.yield(); + else + { + //if the number of unacknowledged data packets does not exceed the congestion + //and the flow window sizes, pack a new packet + int unAcknowledged=unacknowledged.get(); + + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()){ + //check for application data + DataPacket dp=sendQueue.poll();//10*Util.getSYNTime(),TimeUnit.MICROSECONDS); + if(dp!=null){ + send(dp); + largestSentSequenceNumber=dp.getPacketSequenceNumber(); + } + else { + Thread.yield(); + return; + } + }else{ + //congestion window full, should we *really* wait for an ack?! + if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ + statistics.incNumberOfCCWindowExceededEvents(); + } + Thread.sleep(1); + //waitForAck(); return; } - DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); - if(dp!=null){ - send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); - } - }else{ - //should we *really* wait for an ack?! - if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ - statistics.incNumberOfCCWindowExceededEvents(); - } - Thread.sleep(1); - //waitForAck(); } //wait + + double snd=session.getCongestionControl().getSendInterval(); long passed=Util.getCurrentTime()-iterationStart; int x=0; @@ -339,17 +330,42 @@ if(x++==0)statistics.incNumberOfCCSlowDownEvents(); //we cannot wait with microsecond precision if(snd-passed>750)Thread.sleep(1); - else Thread.yield(); + else if((snd-passed)/snd > 0.9){ + return; + } passed=Util.getCurrentTime()-iterationStart; } - + } /** + * re-submits an entry from the sender loss list + * @param entry + */ + protected void handleResubmit(SenderLossListEntry entry){ + long seqNumber = entry.getSequenceNumber(); + //TODO + //if the current seqNumber is 16n,check the timeOut in the + //loss list and send a message drop request. + //if((seqNumber%16)==0){ + //sendLossList.checkTimeOut(timeToLive); + //} + try { + //retransmit the packet and remove it from the list + DataPacket pktToRetransmit = sendBuffer.get(seqNumber); + if(pktToRetransmit!=null){ + endpoint.doSend(pktToRetransmit); + statistics.incNumberOfRetransmittedDataPackets(); + } + }catch (Exception e) { + logger.log(Level.WARNING,"",e); + } + } + + /** * for processing EXP event (see spec. p 13) */ protected void putUnacknowledgedPacketsIntoLossList(){ - synchronized (sendLock) { for(Long l: sendBuffer.keySet()){ senderLossList.insert(new SenderLossListEntry(l)); @@ -391,7 +407,7 @@ return largestSentSequenceNumber>=sequenceNumber; } - + boolean haveLostPackets(){ return !senderLossList.isEmpty(); } Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-28 14:51:57 UTC (rev 28) @@ -32,6 +32,7 @@ package udt; +import java.net.DatagramPacket; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -64,11 +65,14 @@ protected final CongestionControl cc; + //cache dgPacket (peer stays the same always) + private DatagramPacket dgPacket; + /** * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=1024; + protected int flowWindowSize=64; /** * remote UDT entity (address and socket ID) @@ -105,7 +109,7 @@ statistics=new UDTStatistics(description); mySocketID=nextSocketID.incrementAndGet(); this.destination=destination; - + this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort()); //init configurable CC String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName()); Object ccObject=null; @@ -210,4 +214,7 @@ this.initialSequenceNumber=initialSequenceNumber; } + public DatagramPacket getDatagram(){ + return dgPacket; + } } Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-28 14:51:57 UTC (rev 28) @@ -148,7 +148,7 @@ */ protected void doWrite(byte[]data, int offset, int length)throws IOException{ try{ - doWrite(data, offset, length, 5, TimeUnit.MILLISECONDS); + doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ IOException io=new IOException(); io.initCause(ie); @@ -163,14 +163,13 @@ * @param length * @param timeout * @param units - * @throws IOException + * @throws IOException - if data cannot be sent * @throws InterruptedException */ protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{ int chunksize=session.getDatagramSize()-24;//need some bytes for the header ByteBuffer bb=ByteBuffer.wrap(data,offset,length); long seqNo=0; - int i=0; while(bb.remaining()>0){ int len=Math.min(bb.remaining(),chunksize); byte[]chunk=new byte[len]; @@ -182,10 +181,9 @@ packet.setDestinationID(session.getDestination().getSocketID()); packet.setData(chunk); //put the packet into the send queue - while(!sender.sendUdtPacket(packet, timeout, units)){ - Thread.sleep(1); + if(!sender.sendUdtPacket(packet, timeout, units)){ + throw new IOException("Queue full"); } - i++; } if(length>0)active=true; } Modified: udt-java/trunk/src/main/java/udt/packets/Destination.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-04-28 14:51:57 UTC (rev 28) @@ -33,7 +33,6 @@ package udt.packets; import java.net.InetAddress; -import java.net.UnknownHostException; public class Destination { @@ -49,7 +48,7 @@ this.port=port; } - public InetAddress getAddress()throws UnknownHostException{ + public InetAddress getAddress(){ return address; } Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2010-04-28 14:51:57 UTC (rev 28) @@ -34,13 +34,8 @@ import udt.util.CircularArray; - - /** * A circular array that records the packet arrival times - * - * - * */ public class PacketHistoryWindow extends CircularArray<Long>{ @@ -57,7 +52,7 @@ * (see specification section 6.2, page 12) * @return the current value */ - public double getPacketArrivalSpeed(){ + public long getPacketArrivalSpeed(){ if(!haveOverflow)return 0; int num=max-1; double AI; @@ -94,7 +89,7 @@ else{ medianPacketArrivalSpeed=0; } - return medianPacketArrivalSpeed; + return (long)Math.ceil(medianPacketArrivalSpeed); } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-28 14:51:57 UTC (rev 28) @@ -72,4 +72,8 @@ public long size(){ return backingList.size(); } + + public String toString(){ + return backingList.toString(); + } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-28 14:51:57 UTC (rev 28) @@ -95,6 +95,6 @@ } public String toString(){ - return "lossListEntry-"+sequenceNumber; + return "lost-"+sequenceNumber; } } Deleted: udt-java/trunk/src/main/java/udt/util/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-28 14:51:57 UTC (rev 28) @@ -1,87 +0,0 @@ -/********************************************************************************* - * Copyright (c) 2010 Forschungszentrum Juelich GmbH - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * (1) Redistributions of source code must retain the above copyright notice, - * this list of conditions and the disclaimer at the end. Redistributions in - * binary form must reproduce the above copyright notice, this list of - * conditions and the following disclaimer in the documentation and/or other - * materials provided with the distribution. - * - * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its - * contributors may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * DISCLAIMER - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - *********************************************************************************/ - -package udt.util; - -import java.util.concurrent.PriorityBlockingQueue; - -/** - * bounded queue - * - */ -public class FlowWindow<E> extends PriorityBlockingQueue<E> { - - private static final long serialVersionUID=1l; - - private volatile int capacity; - - /** - * create a new flow window with the given size - * - * @param size - the initial size of the flow window - */ - public FlowWindow(int size){ - super(); - this.capacity=size; - } - - /** - * create a new flow window with the default size of 16 - */ - public FlowWindow(){ - this(16); - } - - public void setCapacity(int newSize){ - capacity=newSize; - } - - public int getCapacity(){ - return capacity; - } - - /** - * try to add an element to the queue, return false if it is not possible - */ - @Override - public boolean offer(E e) { - if(contains(e)){ - return true; - } - if(size()<capacity){ - return super.offer(e); - }else return false; - } - - - -} Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-28 14:51:57 UTC (rev 28) @@ -13,14 +13,36 @@ private final NumberFormat format; + private final boolean verbose; + private final long nValue; + private long start; + + private String msg; + public MeanValue(){ + this(false, 64); + } + + public MeanValue(boolean verbose){ + this(verbose, 64); + } + + public MeanValue(boolean verbose, int nValue){ format=NumberFormat.getNumberInstance(); format.setMaximumFractionDigits(2); + this.verbose=verbose; + this.nValue=nValue; + begin(); } + public void addValue(double value){ mean=(mean*n+value)/(n+1); n++; + if(verbose && n % nValue == 1){ + if(msg!=null)System.out.print(msg+" "); + System.out.println(getFormattedMean()); + } } public double getMean(){ @@ -35,4 +57,16 @@ mean=0; n=0; } + + public void begin(){ + start=Util.getCurrentTime(); + } + + public void end(){ + addValue(Util.getCurrentTime()-start); + } + public void end(String msg){ + this.msg=msg; + addValue(Util.getCurrentTime()-start); + } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-28 14:51:57 UTC (rev 28) @@ -108,6 +108,9 @@ System.out.println("[ReceiveFile] Rate: "+(int)mbytes+" MBytes/sec. "+(int)mbit+" MBit/sec."); client.shutdown(); + + if(verbose)System.out.println(client.getStatistics()); + }finally{ fos.close(); } Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-28 14:51:57 UTC (rev 28) @@ -151,7 +151,7 @@ out.write(PacketUtil.encode(size)); long start=System.currentTimeMillis(); //and send the file - Util.copy(fis, out, size, true); + Util.copy(fis, out, size, false); long end=System.currentTimeMillis(); System.out.println(socket.getSession().getStatistics().toString()); double rate=1000.0*size/1024/1024/(end-start); Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-28 14:51:57 UTC (rev 28) @@ -121,7 +121,7 @@ * @throws IOException */ public static void copy(InputStream source, OutputStream target, long size, boolean flush)throws IOException{ - byte[]buf=new byte[1*1024*1024]; + byte[]buf=new byte[65536]; int c; long read=0; while(true){ @@ -149,5 +149,5 @@ p.setPort(clientPort); endpoint.sendRaw(p); } - + } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/TestList.java 2010-04-28 14:51:57 UTC (rev 28) @@ -12,7 +12,6 @@ import udt.sender.SenderLossList; import udt.sender.SenderLossListEntry; import udt.util.CircularArray; -import udt.util.FlowWindow; /* * tests for the various list and queue classes @@ -33,27 +32,16 @@ c.add(11); System.out.println(c); } - - public void testFlowWindow(){ - FlowWindow<Long>f=new FlowWindow<Long>(5); - for(int i=0;i<5;i++){ - System.out.println(i); - assertTrue(f.add(Long.valueOf(i))); - } - assertFalse(f.add(0l)); - f.setCapacity(6); - assertTrue(f.add(0l)); - } - + public void testPacketHistoryWindow(){ PacketHistoryWindow packetHistoryWindow = new PacketHistoryWindow(16); - - for(int i=0;i<17;i++){ - packetHistoryWindow.add(i*5000l); + long offset=1000000; + for(int i=0;i<28;i++){ + packetHistoryWindow.add(offset+i*5000l); } //packets arrive every 5 ms, so packet arrival rate is 200/sec - assertEquals(200.0,packetHistoryWindow.getPacketArrivalSpeed()); + assertEquals(200,packetHistoryWindow.getPacketArrivalSpeed()); } @@ -109,6 +97,9 @@ d1.setPacketSequenceNumber(1); DataPacket d2=new DataPacket(); d2.setPacketSequenceNumber(2); + DataPacket d3=new DataPacket(); + d3.setPacketSequenceNumber(3); + q.offer(d3); q.offer(d2); q.offer(d1); q.offer(control); @@ -116,16 +107,28 @@ UDTPacket p1=q.poll(); assertTrue(p1.isControlPacket()); - UDTPacket p2=q.poll(); - assertFalse(p2.isControlPacket()); + UDTPacket p=q.poll(); + assertFalse(p.isControlPacket()); //check ordering by sequence number - assertEquals(1,p2.getPacketSequenceNumber()); + assertEquals(1,p.getPacketSequenceNumber()); - UDTPacket p3=q.poll(); - assertFalse(p3.isControlPacket()); - assertEquals(2,p3.getPacketSequenceNumber()); + DataPacket d=new DataPacket(); + d.setPacketSequenceNumber(54); + q.offer(d); + p=q.poll(); + assertFalse(p.isControlPacket()); + assertEquals(2,p.getPacketSequenceNumber()); + p=q.poll(); + assertFalse(p.isControlPacket()); + assertEquals(3,p.getPacketSequenceNumber()); + + p=q.poll(); + assertFalse(p.isControlPacket()); + assertEquals(54,p.getPacketSequenceNumber()); + + } } Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-28 14:51:57 UTC (rev 28) @@ -17,7 +17,8 @@ Thread.sleep(500); }while(!serverStarted); - File f=new File("src/test/java/datafile"); + //File f=new File("src/test/java/datafile"); + File f=new File("/tmp/100MB"); File tmp=File.createTempFile("udtest-", null); Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2010-04-28 14:51:57 UTC (rev 28) @@ -29,7 +29,6 @@ //set an artificial loss rate public void testWithLoss()throws Exception{ UDTReceiver.dropRate=3; - UDTReceiver.connectionExpiryDisabled=true; TIMEOUT=Integer.MAX_VALUE; num_packets=512; //set log level @@ -40,7 +39,6 @@ //send even more data public void testLargeDataSet()throws Exception{ UDTReceiver.dropRate=0; - UDTReceiver.connectionExpiryDisabled=true; TIMEOUT=Integer.MAX_VALUE; num_packets=3*1024; //set log level Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-28 14:51:57 UTC (rev 28) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=50; //how large is a single packet int size=1*1024*1024; @@ -35,7 +35,6 @@ //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - //UDTReceiver.connectionExpiryDisabled=true; doTest(); } @@ -56,12 +55,12 @@ MessageDigest digest=MessageDigest.getInstance("MD5"); while(!serverRunning)Thread.sleep(100); long start=System.currentTimeMillis(); - System.out.println("Sending <"+num_packets+"> packets of <"+size/1024/1024+"> Mbytes each"); + System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each"); long end=0; if(serverRunning){ for(int i=0;i<num_packets;i++){ long block=System.currentTimeMillis(); - client.sendBlocking(data); + client.send(data); digest.update(data); double took=System.currentTimeMillis()-block; double arrival=client.getStatistics().getPacketArrivalRate(); @@ -71,6 +70,7 @@ + " snd: "+format.format(snd) +" rate: "+format.format(size/(1024*took))+ " MB/sec"); } + client.flush(); end=System.currentTimeMillis(); client.shutdown(); }else throw new IllegalStateException(); Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-28 14:51:57 UTC (rev 28) @@ -7,13 +7,14 @@ import junit.framework.TestCase; import udt.UDPEndPoint; +import udt.util.MeanValue; /** * send some data over a UDP connection and measure performance */ public class UDPTest extends TestCase { - final int num_packets=5*1000; + final int num_packets=10*1000; final int packetSize=UDPEndPoint.DATAGRAM_SIZE; public void test1()throws Exception{ @@ -30,9 +31,12 @@ dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); + MeanValue v=new MeanValue(); for(int i=0;i<num_packets;i++){ dp.setData(data); + v.begin(); s.send(dp); + v.end(); } System.out.println("Finished sending."); while(serverRunning)Thread.sleep(10); @@ -41,6 +45,7 @@ System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec"); System.out.println("Rate "+num_packets+" packets/sec"); + System.out.println("Mean send time "+v.getFormattedMean()+" microsec"); System.out.println("Server received: "+total); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-05-01 20:49:58
|
Revision: 35 http://udt-java.svn.sourceforge.net/udt-java/?rev=35&view=rev Author: bschuller Date: 2010-05-01 20:49:52 +0000 (Sat, 01 May 2010) Log Message: ----------- nicer stats; stats switchable Modified Paths: -------------- udt-java/trunk/README udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/util/MeanValue.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/main/java/udt/util/StatisticsHistoryEntry.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/main/scripts/fufex-recv udt-java/trunk/src/main/scripts/fufex-send udt-java/trunk/src/main/scripts/receive-file udt-java/trunk/src/main/scripts/send-file udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/trunk/src/main/java/udt/util/MeanThroughput.java Modified: udt-java/trunk/README =================================================================== --- udt-java/trunk/README 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/README 2010-05-01 20:49:52 UTC (rev 35) @@ -3,13 +3,24 @@ # This is a native Java implementation of the UDT protocol. +The UDT protocol has been developed by Yunhong Gu and +Robert Grossmann from the University of Illinois, and provides +high-speed data transfer with configurable congestion control. +# +# Using UDT-Java +# +UDT-Java can be used as a library for developing your own +applications, and it can be used as-is as a commandline tool for +file transfer. + # -# Sample applications +# File transfer applications # -We provide "send-file" and "receive-file" scripts that work analogously to their C++ counterparts. +We provide "send-file" and "receive-file" scripts that work analogously +to their C++ counterparts. To start a file "server", @@ -19,17 +30,24 @@ bin/receive-file <server_host> <server_port> <remote_filename> <local_filename> +where the <server_host> can be a server name or a numerical IP address. # # Setting up a development environment using Eclipse # +If you want to work with the UDT-Java source code, you need to first +checkout the sourcecode from Sourceforge using Subversion: + + svn checkout http://udt-java.sourceforge.net/svnroot/udt-java/udt-java/trunk udt-java + You will need Apache Maven from http://maven.apache.org To generate Eclipse project files, + cd udt-java mvn eclipse:eclipse -Then, import the project into your workspace. +Then, open Eclipse and import the project into your workspace, using +"Import/Existing projects into workspace...". - \ No newline at end of file Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-05-01 20:49:52 UTC (rev 35) @@ -152,6 +152,8 @@ */ public static boolean connectionExpiryDisabled=false; + private final boolean storeStatistics; + /** * create a receiver with a valid {@link UDTSession} * @param session @@ -169,6 +171,7 @@ largestReceivedSeqNumber=session.getInitialSequenceNumber()-1; bufferSize=session.getReceiveBufferSize(); handoffQueue=new ArrayBlockingQueue<UDTPacket>(4*session.getFlowWindowSize()); + storeStatistics=Boolean.getBoolean("udt.receiver.storeStatistics"); initMetrics(); start(); } @@ -178,6 +181,7 @@ private MeanValue processTime; private MeanValue dataProcessTime; private void initMetrics(){ + if(!storeStatistics)return; dgReceiveInterval=new MeanValue("UDT receive interval"); statistics.addMetric(dgReceiveInterval); dataPacketInterval=new MeanValue("Data packet interval"); @@ -216,9 +220,9 @@ * packets are written by the endpoint */ protected void receive(UDTPacket p)throws IOException{ - dgReceiveInterval.end(); + if(storeStatistics)dgReceiveInterval.end(); handoffQueue.offer(p); - dgReceiveInterval.begin(); + if(storeStatistics)dgReceiveInterval.begin(); } /** @@ -261,9 +265,11 @@ if(needEXPReset){ nextEXP=Util.getCurrentTime()+expTimerInterval; } - processTime.begin(); + if(storeStatistics)processTime.begin(); + processUDTPacket(packet); - processTime.end(); + + if(storeStatistics)processTime.end(); } Thread.yield(); @@ -348,11 +354,15 @@ if(!p.isControlPacket()){ DataPacket dp=(DataPacket)p; - dataPacketInterval.end(); - dataProcessTime.begin(); + if(storeStatistics){ + dataPacketInterval.end(); + dataProcessTime.begin(); + } onDataPacketReceived(dp); - dataProcessTime.end(); - dataPacketInterval.begin(); + if(storeStatistics){ + dataProcessTime.end(); + dataPacketInterval.begin(); + } } else if (p.getControlPacketType()==ControlPacketType.ACK2.ordinal()){ Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-05-01 20:49:52 UTC (rev 35) @@ -50,6 +50,7 @@ import udt.packets.KeepAlive; import udt.packets.NegativeAcknowledgement; import udt.sender.SenderLossList; +import udt.util.MeanThroughput; import udt.util.MeanValue; import udt.util.UDTStatistics; import udt.util.UDTThreadFactory; @@ -114,6 +115,8 @@ //used by the sender to wait for an ACK of a certain sequence number private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + private final boolean storeStatistics; + public UDTSender(UDTSession session,UDPEndPoint endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; @@ -125,17 +128,22 @@ lastAckSequenceNumber=session.getInitialSequenceNumber(); waitForAckLatch.set(new CountDownLatch(1)); waitForSeqAckLatch.set(new CountDownLatch(1)); + storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics"); initMetrics(); doStart(); } private MeanValue dgSendTime; private MeanValue dgSendInterval; + private MeanThroughput throughput; private void initMetrics(){ + if(!storeStatistics)return; dgSendTime=new MeanValue("Datagram send time"); statistics.addMetric(dgSendTime); dgSendInterval=new MeanValue("Datagram send interval"); statistics.addMetric(dgSendInterval); + throughput=new MeanThroughput("Throughput", session.getDatagramSize()); + statistics.addMetric(throughput); } /** @@ -182,11 +190,17 @@ */ private void send(DataPacket p)throws IOException{ synchronized(sendLock){ - dgSendInterval.end(); - dgSendTime.begin(); + if(storeStatistics){ + dgSendInterval.end(); + dgSendTime.begin(); + } endpoint.doSend(p); - dgSendTime.end(); - dgSendInterval.begin(); + if(storeStatistics){ + dgSendTime.end(); + dgSendInterval.begin(); + throughput.end(); + throughput.begin(); + } sendBuffer.put(p.getPacketSequenceNumber(), p); unacknowledged.incrementAndGet(); } @@ -261,7 +275,7 @@ //send ACK2 packet to the receiver sendAck2(ackNumber); statistics.incNumberOfACKReceived(); - statistics.storeParameters(); + if(storeStatistics)statistics.storeParameters(); } /** @@ -275,8 +289,7 @@ session.getCongestionControl().onLoss(nak.getDecodedLossInfo()); session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); - statistics.storeParameters(); - + if(logger.isLoggable(Level.FINER)){ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " +"set send period to "+session.getCongestionControl().getSendInterval()); @@ -303,9 +316,7 @@ /** * sender algorithm */ - MeanValue v=new MeanValue("Wait for Ack time: "); public void senderAlgorithm()throws InterruptedException, IOException{ - statistics.addMetric(v); while(!paused){ long iterationStart=Util.getCurrentTime(); //last packet send time? @@ -338,9 +349,7 @@ if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ statistics.incNumberOfCCWindowExceededEvents(); } - v.begin(); waitForAck(); - v.end(); } } @@ -356,6 +365,7 @@ x++; } passed=Util.getCurrentTime()-iterationStart; + if(stopped)return; } } } Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-05-01 20:49:52 UTC (rev 35) @@ -204,6 +204,7 @@ sender.waitForAck(seqNo); } } + sender.pause(); } //writes and wait for ack Added: udt-java/trunk/src/main/java/udt/util/MeanThroughput.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanThroughput.java (rev 0) +++ udt-java/trunk/src/main/java/udt/util/MeanThroughput.java 2010-05-01 20:49:52 UTC (rev 35) @@ -0,0 +1,30 @@ +package udt.util; + + +/** + * holds a floating mean value + */ +public class MeanThroughput extends MeanValue{ + + private final double packetSize; + + public MeanThroughput(String name, int packetSize){ + this(name, false, 64, packetSize); + } + + public MeanThroughput(String name, boolean verbose, int packetSize){ + this(name, verbose, 64, packetSize); + } + + public MeanThroughput(String name, boolean verbose, int nValue, int packetSize){ + super(name,verbose,nValue); + this.packetSize=packetSize; + } + + @Override + public double getMean() { + return packetSize/super.getMean(); + } + + +} Property changes on: udt-java/trunk/src/main/java/udt/util/MeanThroughput.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-05-01 20:49:52 UTC (rev 35) @@ -1,6 +1,7 @@ package udt.util; import java.text.NumberFormat; +import java.util.Locale; /** * holds a floating mean value @@ -31,20 +32,20 @@ } public MeanValue(String name, boolean verbose, int nValue){ - format=NumberFormat.getNumberInstance(); + format=NumberFormat.getNumberInstance(Locale.ENGLISH); format.setMaximumFractionDigits(2); + format.setGroupingUsed(false); this.verbose=verbose; this.nValue=nValue; this.name=name; - begin(); } public void addValue(double value){ mean=(mean*n+value)/(n+1); n++; if(verbose && n % nValue == 0){ - if(msg!=null)System.out.print(msg+" "); - System.out.println(getFormattedMean()); + if(msg!=null)System.out.println(msg+" "+getFormattedMean()); + else System.out.println(getFormattedMean()); } } @@ -53,7 +54,7 @@ } public String getFormattedMean(){ - return format.format(mean); + return format.format(getMean()); } public void clear(){ @@ -66,7 +67,7 @@ } public void end(){ - addValue(Util.getCurrentTime()-start); + if(start>0)addValue(Util.getCurrentTime()-start); } public void end(String msg){ this.msg=msg; @@ -76,4 +77,8 @@ public String getName(){ return name; } + + public String toString(){ + return name; + } } Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-05-01 20:49:52 UTC (rev 35) @@ -157,8 +157,11 @@ System.out.println(socket.getSession().getStatistics().toString()); double rate=1000.0*size/1024/1024/(end-start); System.out.println("[SendFile] Rate: "+format.format(rate)+" MBytes/sec. "+format.format(8*rate)+" MBit/sec."); - socket.getSession().getStatistics().writeParameterHistory(new File("udtstats-"+System.currentTimeMillis()+".csv")); + if(Boolean.getBoolean("udt.sender.storeStatistics")){ + socket.getSession().getStatistics().writeParameterHistory(new File("udtstats-"+System.currentTimeMillis()+".csv")); + } }finally{ + socket.getSender().stop(); fis.close(); } logger.info("Finished request from "+socket.getSession().getDestination()); Modified: udt-java/trunk/src/main/java/udt/util/StatisticsHistoryEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/StatisticsHistoryEntry.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/util/StatisticsHistoryEntry.java 2010-05-01 20:49:52 UTC (rev 35) @@ -1,5 +1,7 @@ package udt.util; +import java.util.List; + public class StatisticsHistoryEntry { private final Object[] values; @@ -14,6 +16,26 @@ this.timestamp=time; } + public StatisticsHistoryEntry(boolean heading, long time, List<MeanValue>metrics){ + this.isHeading=heading; + this.timestamp=time; + int length=metrics.size(); + if(isHeading)length++; + Object[]metricValues=new Object[length]; + if(isHeading){ + metricValues[0]="time"; + for(int i=0;i<metrics.size();i++){ + metricValues[i+1]=metrics.get(i).getName(); + } + } + else{ + for(int i=0;i<metricValues.length;i++){ + metricValues[i]=metrics.get(i).getFormattedMean(); + } + } + this.values=metricValues; + } + public StatisticsHistoryEntry(Object ...values){ this(false,System.currentTimeMillis(),values); } @@ -26,12 +48,12 @@ if(!isHeading){ sb.append(timestamp); for(Object val: values){ - sb.append(",").append(String.valueOf(val)); + sb.append(" , ").append(String.valueOf(val)); } } else{ for(int i=0;i<values.length;i++){ - if(i>0)sb.append(","); + if(i>0)sb.append(" , "); sb.append(String.valueOf(values[i])); } } Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-05-01 20:49:52 UTC (rev 35) @@ -42,7 +42,6 @@ /** * This class is used to keep some statistics about a UDT connection. - * It also allows to compute a MD5 hash over the received data */ public class UDTStatistics { @@ -226,10 +225,10 @@ synchronized (statsHistory) { if(first){ first=false; - statsHistory.add(new StatisticsHistoryEntry(true,0,"time","packetRate","sendPeriod")); + statsHistory.add(new StatisticsHistoryEntry(true,0,metrics)); initialTime=System.currentTimeMillis(); } - statsHistory.add(new StatisticsHistoryEntry(false,System.currentTimeMillis()-initialTime,packetArrivalRate,sendPeriod)); + statsHistory.add(new StatisticsHistoryEntry(false,System.currentTimeMillis()-initialTime,metrics)); } } Modified: udt-java/trunk/src/main/scripts/fufex-recv =================================================================== --- udt-java/trunk/src/main/scripts/fufex-recv 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/scripts/fufex-recv 2010-05-01 20:49:52 UTC (rev 35) @@ -36,5 +36,5 @@ #memory for the VM MEM=-Xmx128m -$JAVA $MEM -cp $CP udt.unicore.FufexReceive $* +$JAVA $MEM $OPTS -cp $CP udt.unicore.FufexReceive $* Modified: udt-java/trunk/src/main/scripts/fufex-send =================================================================== --- udt-java/trunk/src/main/scripts/fufex-send 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/scripts/fufex-send 2010-05-01 20:49:52 UTC (rev 35) @@ -37,5 +37,5 @@ #memory for the VM MEM=-Xmx128m -$JAVA $MEM -cp $CP udt.unicore.FufexSend $* +$JAVA $MEM $OPTS -cp $CP udt.unicore.FufexSend $* Modified: udt-java/trunk/src/main/scripts/receive-file =================================================================== --- udt-java/trunk/src/main/scripts/receive-file 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/scripts/receive-file 2010-05-01 20:49:52 UTC (rev 35) @@ -1,6 +1,15 @@ #!/bin/sh # +# Start script for the UDT-Java send-file application +# Usage: receive-file <server-address> <server-port> <remote-file> <local-file> +# [--localIP=<local ip>] +# [--localPort=<local port>] +# [--verbose] +# +# + +# #Installation Directory # dir=`dirname $0` @@ -24,6 +33,12 @@ # #INST= +# +# Alternative CC class +# +#OPTS=$OPTS" -Dudt.congestioncontrol.class=udt.cc.SimpleTCP" + +#Java command to use JAVA=java #location of udt.jar @@ -32,5 +47,5 @@ #memory for the VM MEM=-Xmx128m -$JAVA $MEM -cp $CP udt.util.ReceiveFile $* +$JAVA $MEM $OPTS -cp $CP udt.util.ReceiveFile $* Modified: udt-java/trunk/src/main/scripts/send-file =================================================================== --- udt-java/trunk/src/main/scripts/send-file 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/main/scripts/send-file 2010-05-01 20:49:52 UTC (rev 35) @@ -1,6 +1,13 @@ #!/bin/sh # +# Start script for the UDT-Java send-file application +# Usage: send-file <server-port> +# [--localIP=<local ip>] +# [--verbose] +# + +# #Installation Directory # dir=`dirname $0` @@ -24,6 +31,19 @@ # #INST= +# +# Alternative CC class +# +#OPTS=$OPTS" -Dudt.congestioncontrol.class=udt.cc.SimpleTCP" + +# +# Collect and store some performance data (e.g. throughput) +# (data will be collected in memory and stored after send finishes +# to a udtstats-<NNNN>.csv file) +#OPTS=$OPTS" -Dudt.sender.storeStatistics=true" + + +#which Java command to use JAVA=java #location of udt.jar @@ -32,5 +52,5 @@ #memory for the VM MEM=-Xmx128m -$JAVA $MEM -cp $CP udt.util.SendFile $* +$JAVA $MEM $OPTS -cp $CP udt.util.SendFile $* Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-30 21:31:25 UTC (rev 34) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-05-01 20:49:52 UTC (rev 35) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=500; + int num_packets=50; //how large is a single packet int size=1*1024*1024; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-02-03 06:57:07
|
Revision: 69 http://udt-java.svn.sourceforge.net/udt-java/?rev=69&view=rev Author: bschuller Date: 2012-02-03 06:57:00 +0000 (Fri, 03 Feb 2012) Log Message: ----------- use Junit4 Modified Paths: -------------- udt-java/trunk/LICENSE udt-java/trunk/pom.xml udt-java/trunk/src/test/java/echo/TestEchoServer.java udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java udt-java/trunk/src/test/java/udt/TestList.java udt-java/trunk/src/test/java/udt/TestReceiverLossList.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java udt-java/trunk/src/test/java/udt/UDTTestBase.java udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java udt-java/trunk/src/test/java/udt/performance/TCPTest.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/LICENSE =================================================================== --- udt-java/trunk/LICENSE 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/LICENSE 2012-02-03 06:57:00 UTC (rev 69) @@ -1,5 +1,5 @@ /********************************************************************************* - * Copyright (c) 2010 Forschungszentrum Juelich GmbH + * Copyright (c) 2010-2012 Forschungszentrum Juelich GmbH * All rights reserved. * * Redistribution and use in source and binary forms, with or without Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/pom.xml 2012-02-03 06:57:00 UTC (rev 69) @@ -27,7 +27,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>3.8.1</version> + <version>4.8.2</version> <scope>test</scope> </dependency> </dependencies> Modified: udt-java/trunk/src/test/java/echo/TestEchoServer.java =================================================================== --- udt-java/trunk/src/test/java/echo/TestEchoServer.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/echo/TestEchoServer.java 2012-02-03 06:57:00 UTC (rev 69) @@ -4,12 +4,16 @@ import java.io.PrintWriter; import java.net.InetAddress; -import junit.framework.TestCase; +import junit.framework.Assert; + +import org.junit.Test; + import udt.UDTClient; import udt.util.Util; -public class TestEchoServer extends TestCase { +public class TestEchoServer { + @Test public void test1()throws Exception{ EchoServer es=new EchoServer(65321); es.start(); @@ -22,9 +26,9 @@ System.out.println("Message sent."); client.getInputStream().setBlocking(false); String line=Util.readLine(client.getInputStream()); - assertNotNull(line); + Assert.assertNotNull(line); System.out.println(line); - assertEquals("test",line); + Assert.assertEquals("test",line); } } Modified: udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java =================================================================== --- udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java 2012-02-03 06:57:00 UTC (rev 69) @@ -4,12 +4,16 @@ import java.io.PrintWriter; import java.net.InetAddress; -import junit.framework.TestCase; +import junit.framework.Assert; + +import org.junit.Test; + import udt.UDTClient; import udt.util.Util; -public class TestEchoServerMultiClient extends TestCase { +public class TestEchoServerMultiClient { + @Test public void testTwoClients()throws Exception{ EchoServer es=new EchoServer(65321); es.start(); @@ -33,8 +37,8 @@ System.out.println("Message sent."); client.getInputStream().setBlocking(false); String line=Util.readLine(client.getInputStream()); - assertNotNull(line); + Assert.assertNotNull(line); System.out.println(line); - assertEquals("test",line); + Assert.assertEquals("test",line); } } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestList.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,8 +1,13 @@ package udt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; -import junit.framework.TestCase; +import org.junit.Test; + import udt.packets.DataPacket; import udt.packets.KeepAlive; import udt.receiver.AckHistoryEntry; @@ -15,8 +20,9 @@ /* * tests for the various list and queue classes */ -public class TestList extends TestCase{ +public class TestList { + @Test public void testCircularArray(){ CircularArray<Integer>c=new CircularArray<Integer>(5); for(int i=0;i<5;i++)c.add(i); @@ -50,14 +56,14 @@ for(int i=0;i<values.length;i++){ p.add(values[i]); } - assertEquals(4.0d, p.computeMedianTimeInterval()); + assertEquals(4.0d, p.computeMedianTimeInterval(), 0.001d); long[] arrivaltimes = {12, 12, 12, 12}; PacketPairWindow p1=new PacketPairWindow(16); for(int i=0;i<values.length;i++){ p1.add(arrivaltimes[i]); } - assertEquals(12.0d, p1.computeMedianTimeInterval()); + assertEquals(12.0d, p1.computeMedianTimeInterval(), 0.001d); } Modified: udt-java/trunk/src/test/java/udt/TestReceiverLossList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestReceiverLossList.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestReceiverLossList.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,11 +1,15 @@ package udt; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + import udt.receiver.ReceiverLossList; import udt.receiver.ReceiverLossListEntry; -import junit.framework.TestCase; -public class TestReceiverLossList extends TestCase { +public class TestReceiverLossList { + @Test public void test1(){ ReceiverLossList l=new ReceiverLossList(); ReceiverLossListEntry e1=new ReceiverLossListEntry(1); Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,8 +1,12 @@ package udt; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.FileInputStream; +import org.junit.Test; + import udt.util.ReceiveFile; import udt.util.SendFile; import udt.util.UDTThreadFactory; @@ -11,6 +15,7 @@ volatile boolean serverStarted=false; + @Test public void test1()throws Exception{ runServer(); do{ Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-02-03 06:57:00 UTC (rev 69) @@ -6,10 +6,17 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import junit.framework.Assert; + +import org.junit.Test; + import udt.util.Util; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestUDTInputStream extends UDTTestBase{ + @Test public void test1()throws Exception{ UDTInputStream is=new UDTInputStream(null); byte[] data1="this is ".getBytes(); @@ -25,6 +32,7 @@ assertEquals(digest,readMD5); } + @Test public void test2()throws Exception{ UDTInputStream is=new UDTInputStream(null); byte[] data1=getRandomData(65537); @@ -40,6 +48,7 @@ assertEquals(digest,readMD5); } + @Test public void testInOrder()throws Exception{ UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); @@ -57,6 +66,7 @@ assertEquals(digest,readMD5); } + @Test public void testRandomOrder()throws Exception{ UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); @@ -76,7 +86,7 @@ } - + @Test public void testLargeDataSetTwoThreads()throws Exception{ final UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); @@ -98,7 +108,7 @@ is.noMoreData(); }catch(Exception e){ e.printStackTrace(); - fail(); + Assert.fail(); } } }; Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,11 +1,16 @@ package udt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import java.net.InetAddress; import java.security.MessageDigest; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.util.Util; public class TestUDTServerSocket extends UDTTestBase{ @@ -18,6 +23,7 @@ int TIMEOUT=20000; + @Test public void testWithoutLoss()throws Exception{ Logger.getLogger("udt").setLevel(Level.WARNING); UDTReceiver.dropRate=0; @@ -27,6 +33,7 @@ } //set an artificial loss rate + @Test public void testWithLoss()throws Exception{ UDTReceiver.dropRate=3; TIMEOUT=Integer.MAX_VALUE; @@ -37,6 +44,7 @@ } //send even more data + @Test public void testLargeDataSet()throws Exception{ UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; @@ -97,6 +105,7 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ + System.out.println("Starting server."); long start=System.currentTimeMillis(); UDTSocket s=serverSocket.accept(); assertNotNull(s); @@ -121,7 +130,6 @@ } catch(Exception e){ e.printStackTrace(); - fail(); serverRunning=false; } } Modified: udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,14 +1,20 @@ package udt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.net.DatagramPacket; import java.net.InetAddress; import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.packets.Destination; public class TestUdpEndpoint extends UDTTestBase{ + @Test public void testClientServerMode()throws Exception{ //select log level @@ -41,6 +47,7 @@ * just check how fast we can send out UDP packets from the endpoint * @throws Exception */ + @Test public void testRawSendRate()throws Exception{ Logger.getLogger("udt").setLevel(Level.WARNING); System.out.println("Checking raw UDP send rate..."); @@ -65,11 +72,12 @@ Thread.sleep(1000); } - //no rendezvous yet... - public void x_testRendezvousConnect()throws Exception{ + //@Test() + public void testRendezvousConnect()throws Exception{ } + @Test public void testBindToAnyPort()throws Exception{ UDPEndPoint ep=new UDPEndPoint(InetAddress.getByName("localhost")); int port=ep.getLocalPort(); Modified: udt-java/trunk/src/test/java/udt/UDTTestBase.java =================================================================== --- udt-java/trunk/src/test/java/udt/UDTTestBase.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/UDTTestBase.java 2012-02-03 06:57:00 UTC (rev 69) @@ -7,12 +7,10 @@ import udt.util.Util; -import junit.framework.TestCase; - /** * some additional utilities useful for testing */ -public abstract class UDTTestBase extends TestCase{ +public abstract class UDTTestBase { //get an array filled with random data protected byte[] getRandomData(int size){ Modified: udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,18 +1,22 @@ package udt.packets; -import udt.packets.ControlPacket; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + import udt.packets.ControlPacket.ControlPacketType; -import junit.framework.TestCase; -public class TestControlPacketType extends TestCase { +public class TestControlPacketType { + @Test public void testSequenceNumber1(){ ControlPacket p=new DummyControlPacket(); byte[]x=p.getHeader(); byte highest=x[0]; assertEquals(128, highest & 0x80); } - + + @Test public void testControlPacketTypes(){ ControlPacketType t=ControlPacketType.CONNECTION_HANDSHAKE; assertEquals(0,t.ordinal()); Modified: udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,10 +1,11 @@ package udt.packets; -import junit.framework.TestCase; -import udt.packets.DataPacket; +import static org.junit.Assert.assertEquals; -public class TestDataPacket extends TestCase { +import org.junit.Test; +public class TestDataPacket { + @Test public void testSequenceNumber1(){ DataPacket p=new DataPacket(); p.setPacketSequenceNumber(1); @@ -17,6 +18,7 @@ assertEquals(1, lowest); } + @Test public void testEncoded(){ DataPacket p=new DataPacket(); p.setPacketSequenceNumber(1); @@ -30,9 +32,9 @@ System.out.println("String s = " + s); } - + + @Test public void testDecode1(){ - DataPacket testPacket1=new DataPacket(); testPacket1.setPacketSequenceNumber(127); testPacket1.setDestinationID(1); @@ -74,6 +76,7 @@ } + @Test public void testEncodeDecode1(){ DataPacket dp=new DataPacket(); dp.setPacketSequenceNumber(127); Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,24 +1,21 @@ package udt.packets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.util.ArrayList; import java.util.List; -import junit.framework.TestCase; +import org.junit.Test; + import udt.UDTPacket; -import udt.packets.Acknowledgement; -import udt.packets.Acknowledgment2; -import udt.packets.ConnectionHandshake; -import udt.packets.DataPacket; -import udt.packets.MessageDropRequest; -import udt.packets.NegativeAcknowledgement; -import udt.packets.PacketFactory; -import udt.packets.Shutdown; -public class TestPacketFactory extends TestCase { +public class TestPacketFactory { + @Test public void testData(){ String test="sdjfsdjfldskjflds"; - + byte[]data=test.getBytes(); data[0]=(byte)(data[0] & 0x7f); UDTPacket p=PacketFactory.createPacket(data); @@ -27,14 +24,14 @@ assertTrue(p instanceof DataPacket); assertEquals(test,t); } - - + + @Test public void testConnectionHandshake(){ ConnectionHandshake p1 = new ConnectionHandshake(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); p1.setDestinationID(1); - + p1.setConnectionType(1); p1.setSocketType(1); p1.setInitialSeqNo(321); @@ -42,16 +39,17 @@ p1.setMaxFlowWndSize(128); p1.setSocketID(1); p1.setUdtVersion(4); - - + + byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); ConnectionHandshake p2=(ConnectionHandshake)p; assertEquals(p1,p2); - + } - + + @Test public void testAcknowledgement(){ Acknowledgement p1 = new Acknowledgement(); p1.setAckSequenceNumber(1234); @@ -64,28 +62,30 @@ p1.setPacketReceiveRate(1000); p1.setRoundTripTime(1000); p1.setRoundTripTimeVar(500); - + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); Acknowledgement p2=(Acknowledgement)p; assertEquals(p1,p2); } - + + @Test public void testAcknowledgementOfAcknowledgement(){ Acknowledgment2 p1 = new Acknowledgment2(); p1.setAckSequenceNumber(1230); p1.setMessageNumber(9871); p1.setTimeStamp(3451); p1.setDestinationID(1); - + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); Acknowledgment2 p2=(Acknowledgment2)p; assertEquals(p1,p2); - - + + } - + + @Test public void testNegativeAcknowledgement(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); @@ -95,15 +95,16 @@ p1.addLossInfo(6); p1.addLossInfo(7, 10); byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); NegativeAcknowledgement p2=(NegativeAcknowledgement)p; assertEquals(p1,p2); - + assertEquals((Integer)5, (Integer)p2.getDecodedLossInfo().get(0)); assertEquals(6, p2.getDecodedLossInfo().size()); } - + + @Test public void testNegativeAcknowledgement2(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); @@ -116,18 +117,19 @@ loss.add(8l); loss.add(9l); loss.add(11l); - + p1.addLossInfo(loss); byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); NegativeAcknowledgement p2=(NegativeAcknowledgement)p; assertEquals(p1,p2); - + assertEquals((Integer)5, (Integer)p2.getDecodedLossInfo().get(0)); assertEquals(6, p2.getDecodedLossInfo().size()); } + @Test public void testNegativeAcknowledgement3(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); @@ -137,42 +139,43 @@ p1.addLossInfo(6); p1.addLossInfo(147, 226); byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); NegativeAcknowledgement p2=(NegativeAcknowledgement)p; assertEquals(p1,p2); - - + + } - - public void testShutdown(){ + + @Test + public void testShutdown(){ Shutdown p1 = new Shutdown(); p1.setMessageNumber(9874); p1.setTimeStamp(3453); p1.setDestinationID(3); - - + + byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); Shutdown p2=(Shutdown)p; assertEquals(p1,p2); } - - - + + + @Test public void testMessageDropRequest(){ MessageDropRequest p1=new MessageDropRequest(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); p1.setDestinationID(4); - + p1.setMsgFirstSeqNo(2); p1.setMsgLastSeqNo(3); - - + + byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); assertTrue(p instanceof MessageDropRequest); MessageDropRequest p2=(MessageDropRequest)p; Modified: udt-java/trunk/src/test/java/udt/performance/TCPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TCPTest.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/TCPTest.java 2012-02-03 06:57:00 UTC (rev 69) @@ -6,17 +6,18 @@ import java.net.Socket; import java.util.Random; -import junit.framework.TestCase; +import org.junit.Test; /** * send some data over a TCP connection and measure performance * */ -public class TCPTest extends TestCase { +public class TCPTest { int BUFSIZE=1024; int num_packets=10*1000; + @Test public void test1()throws Exception{ runServer(); //client socket @@ -59,7 +60,7 @@ } catch(Exception e){ e.printStackTrace(); - fail(); + serverRunning=false; } } }; Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,5 +1,7 @@ package udt.performance; +import static org.junit.Assert.*; + import java.io.File; import java.net.InetAddress; import java.security.MessageDigest; @@ -9,6 +11,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.UDTClient; import udt.UDTInputStream; import udt.UDTReceiver; @@ -31,6 +35,7 @@ int READ_BUFFERSIZE=1*1024*1024; + @Test public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); // System.setProperty("udt.receiver.storeStatistics","true"); Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java 2012-02-03 06:57:00 UTC (rev 69) @@ -3,6 +3,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.UDTReceiver; import udt.UDTSession; import udt.cc.SimpleTCP; @@ -22,6 +24,8 @@ int READ_BUFFERSIZE=1*1024*1024; + @Override + @Test public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); UDTReceiver.dropRate=0; Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69) @@ -7,7 +7,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; -import junit.framework.TestCase; +import org.junit.Test; + import udt.UDPEndPoint; import udt.packets.DataPacket; import udt.util.MeanValue; @@ -15,11 +16,12 @@ /** * send some data over a UDP connection and measure performance */ -public class UDPTest extends TestCase { +public class UDPTest { final int num_packets=10*10*1000; final int packetSize=UDPEndPoint.DATAGRAM_SIZE; + @Test public void test1()throws Exception{ runServer(); runThirdThread(); Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,12 +1,20 @@ package udt.sender; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.TimeoutException; -import junit.framework.TestCase; +import org.junit.Test; + import udt.packets.DataPacket; -public class TestFlowWindow extends TestCase { +public class TestFlowWindow { + @Test public void testFillWindow()throws InterruptedException, TimeoutException{ FlowWindow fw=new FlowWindow(3, 128); DataPacket p1=fw.getForProducer(); @@ -38,6 +46,7 @@ assertTrue(fw.isEmpty()); } + @Test public void testOverflow()throws InterruptedException, TimeoutException{ FlowWindow fw=new FlowWindow(3, 64); DataPacket p1=fw.getForProducer(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-05-25 08:03:15
|
Revision: 74 http://udt-java.svn.sourceforge.net/udt-java/?rev=74&view=rev Author: bschuller Date: 2012-05-25 08:03:03 +0000 (Fri, 25 May 2012) Log Message: ----------- attempt to switch to new UDT-C++ style of connection handshake. DOES NOT WORK yet Modified Paths: -------------- udt-java/trunk/pom.xml udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/ServerSession.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTServerSocket.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/PacketUtil.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/SequenceNumber.java udt-java/trunk/src/test/java/echo/EchoServer.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/pom.xml 2012-05-25 08:03:03 UTC (rev 74) @@ -5,7 +5,7 @@ <artifactId>udt-java</artifactId> <packaging>jar</packaging> <name>UDT Java implementation</name> - <version>0.6-SNAPSHOT</version> + <version>0.7-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> <developers> <developer> Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -52,6 +52,8 @@ private UDPEndPoint endPoint; + long initialSequenceNo=SequenceNumber.random(); + public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; @@ -60,7 +62,7 @@ /** * send connection handshake until a reply from server is received - * TODO check for timeout + * @throws InterruptedException * @throws IOException */ @@ -68,11 +70,21 @@ public void connect() throws InterruptedException,IOException{ int n=0; while(getState()!=ready){ - sendHandShake(); if(getState()==invalid)throw new IOException("Can't connect!"); - n++; - if(getState()!=ready)Thread.sleep(500); + if(getState()<=handshaking){ + setState(handshaking); + sendInitialHandShake(); + } + else if(getState()==handshaking+1){ + sendSecondHandshake(); + } + + if(getState()==invalid)throw new IOException("Can't connect!"); + if(n++ > 10)throw new IOException("Could not connect to server within the timeout."); + + Thread.sleep(500); } + Thread.sleep(1000); cc.init(); logger.info("Connected, "+n+" handshake packets sent"); } @@ -82,38 +94,10 @@ lastPacket=packet; - if (packet instanceof ConnectionHandshake) { + if (packet.isConnectionHandshake()) { ConnectionHandshake hs=(ConnectionHandshake)packet; - - logger.info("Received connection handshake from "+peer+"\n"+hs); - - if (getState()!=ready) { - if(hs.getConnectionType()==1){ - try{ - //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); - destination.setSocketID(peerSocketID); - sendConfirmation(hs); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); - } - return; - } - else{ - try{ - //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); - destination.setSocketID(peerSocketID); - setState(ready); - socket=new UDTSocket(endPoint,this); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); - } - return; - } - } + handleConnectionHandshake(hs,peer); + return; } if(getState() == ready) { @@ -140,9 +124,43 @@ } } + protected void handleConnectionHandshake(ConnectionHandshake hs, Destination peer){ - //handshake for connect - protected void sendHandShake()throws IOException{ + if (getState()==handshaking) { + logger.info("Received initial handshake response from "+peer+"\n"+hs); + if(hs.getConnectionType()==ConnectionHandshake.CONNECTION_SERVER_ACK){ + try{ + //TODO validate parameters sent by peer + long peerSocketID=hs.getSocketID(); + sessionCookie=hs.getCookie(); + destination.setSocketID(peerSocketID); + setState(handshaking+1); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + return; + } + else{ + logger.info("Unexpected type of handshake packet received"); + setState(invalid); + } + } + else if(getState()==handshaking+1){ + try{ + logger.info("Received confirmation handshake response from "+peer+"\n"+hs); + //TODO validate parameters sent by peer + setState(ready); + socket=new UDTSocket(endPoint,this); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + } + } + + //initial handshake for connect + protected void sendInitialHandShake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); @@ -153,20 +171,24 @@ handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); + handshake.setAddress(endPoint.getLocalAddress()); logger.info("Sending "+handshake); endPoint.doSend(handshake); } //2nd handshake for connect - protected void sendConfirmation(ConnectionHandshake hs)throws IOException{ + protected void sendSecondHandshake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); - handshake.setConnectionType(-1); + handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); - handshake.setInitialSeqNo(hs.getInitialSeqNo()); - handshake.setPacketSize(hs.getPacketSize()); + handshake.setInitialSeqNo(initialSequenceNo); + handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); + handshake.setCookie(sessionCookie); + handshake.setAddress(endPoint.getLocalAddress()); + handshake.setDestinationID(getDestination().getSocketID()); logger.info("Sending confirmation "+handshake); endPoint.doSend(handshake); } Modified: udt-java/trunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ServerSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/ServerSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -33,7 +33,6 @@ package udt; import java.io.IOException; -import java.net.DatagramPacket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.logging.Level; @@ -43,6 +42,7 @@ import udt.packets.Destination; import udt.packets.KeepAlive; import udt.packets.Shutdown; +import udt.util.SequenceNumber; /** * server side session in client-server mode @@ -56,10 +56,10 @@ //last received packet (for testing purposes) private UDTPacket lastPacket; - public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ - super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort())); + public ServerSession(Destination peer, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ + super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+peer.getAddress()+":"+peer.getPort(),peer); this.endPoint=endPoint; - logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); + logger.info("Created "+toString()+" talking to "+peer.getAddress()+":"+peer.getPort()); } int n_handshake=0; @@ -68,79 +68,45 @@ public void received(UDTPacket packet, Destination peer){ lastPacket=packet; - if(packet instanceof ConnectionHandshake) { - ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet; - logger.info("Received "+connectionHandshake); + if(packet.isConnectionHandshake()) { + handleHandShake((ConnectionHandshake)packet); + return; + } - if (getState()<=ready){ - destination.setSocketID(connectionHandshake.getSocketID()); - - if(getState()<=handshaking){ - setState(handshaking); - } - try{ - handleHandShake(connectionHandshake); - n_handshake++; - try{ - setState(ready); - socket=new UDTSocket(endPoint, this); - cc.init(); - }catch(Exception uhe){ - //session is invalid - logger.log(Level.SEVERE,"",uhe); - setState(invalid); - } - }catch(IOException ex){ - //session invalid - logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); - setState(invalid); - } - return; - } - - }else if(packet instanceof KeepAlive) { + if(packet instanceof KeepAlive) { socket.getReceiver().resetEXPTimer(); active = true; return; } - if(getState()== ready) { - active = true; - - if (packet instanceof KeepAlive) { - //nothing to do here - return; - }else if (packet instanceof Shutdown) { - try{ - socket.getReceiver().stop(); - }catch(IOException ex){ - logger.log(Level.WARNING,"",ex); - } - setState(shutdown); - System.out.println("SHUTDOWN ***"); - active = false; - logger.info("Connection shutdown initiated by the other side."); - return; + if (packet instanceof Shutdown) { + try{ + socket.getReceiver().stop(); + }catch(IOException ex){ + logger.log(Level.WARNING,"",ex); } + setState(shutdown); + active = false; + logger.info("Connection shutdown initiated by peer."); + return; + } - else{ - try{ - if(packet.forSender()){ - socket.getSender().receive(packet); - }else{ - socket.getReceiver().receive(packet); - } - }catch(Exception ex){ - //session invalid - logger.log(Level.SEVERE,"",ex); - setState(invalid); + if(getState() == ready) { + active = true; + try{ + if(packet.forSender()){ + socket.getSender().receive(packet); + }else{ + socket.getReceiver().receive(packet); } + }catch(Exception ex){ + //session invalid + logger.log(Level.SEVERE,"",ex); + setState(invalid); } return; - } - } /** @@ -151,16 +117,73 @@ } /** - * handle the connection handshake:<br/> - * <ul> - * <li>set initial sequence number</li> - * <li>send response handshake</li> - * </ul> + * reply to a connection handshake message + * @param connectionHandshake + */ + protected void handleHandShake(ConnectionHandshake connectionHandshake){ + logger.info("Received "+connectionHandshake + " in state <"+getState()+">"); + if(getState()==ready){ + //just send confirmation packet again + try{ + sendFinalHandShake(connectionHandshake); + }catch(IOException io){} + return; + } + + if (getState()<ready){ + destination.setSocketID(connectionHandshake.getSocketID()); + + if(getState()<handshaking){ + setState(handshaking); + } + + try{ + n_handshake++; + boolean handShakeComplete=handleSecondHandShake(connectionHandshake); + if(handShakeComplete){ + logger.info("Client/Server handshake complete!"); + setState(ready); + socket=new UDTSocket(endPoint, this); + cc.init(); + } + }catch(IOException ex){ + //session invalid + logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); + setState(invalid); + } + } + } + + private ConnectionHandshake finalConnectionHandshake; + + /** + * handle the connection handshake + * * @param handshake * @param peer * @throws IOException */ - protected void handleHandShake(ConnectionHandshake handshake)throws IOException{ + protected boolean handleSecondHandShake(ConnectionHandshake handshake)throws IOException{ + if(sessionCookie==0){ + ackInitialHandshake(handshake); + //need one more handshake + return false; + } + + long otherCookie=handshake.getCookie(); + if(sessionCookie!=otherCookie){ + setState(invalid); + throw new IOException("Invalid cookie <"+otherCookie+"> received, my cookie is <"+sessionCookie+">"); + } + sendFinalHandShake(handshake); + return true; + } + + /* + * response after the initial connection handshake received: + * compute cookie + */ + protected void ackInitialHandshake(ConnectionHandshake handshake)throws IOException{ ConnectionHandshake responseHandshake = new ConnectionHandshake(); //compare the packet size and choose minimun long clientBufferSize=handshake.getPacketSize(); @@ -178,12 +201,40 @@ responseHandshake.setSocketID(mySocketID); responseHandshake.setDestinationID(this.getDestination().getSocketID()); responseHandshake.setSession(this); + sessionCookie=SequenceNumber.random(); + responseHandshake.setCookie(sessionCookie); + responseHandshake.setAddress(endPoint.getLocalAddress()); logger.info("Sending reply "+responseHandshake); endPoint.doSend(responseHandshake); } + protected void sendFinalHandShake(ConnectionHandshake handshake)throws IOException{ + if(finalConnectionHandshake==null){ + finalConnectionHandshake= new ConnectionHandshake(); + //compare the packet size and choose minimun + long clientBufferSize=handshake.getPacketSize(); + long myBufferSize=getDatagramSize(); + long bufferSize=Math.min(clientBufferSize, myBufferSize); + long initialSequenceNumber=handshake.getInitialSeqNo(); + setInitialSequenceNumber(initialSequenceNumber); + setDatagramSize((int)bufferSize); + finalConnectionHandshake.setPacketSize(bufferSize); + finalConnectionHandshake.setUdtVersion(4); + finalConnectionHandshake.setInitialSeqNo(initialSequenceNumber); + finalConnectionHandshake.setConnectionType(-1); + finalConnectionHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize()); + //tell peer what the socket ID on this side is + finalConnectionHandshake.setSocketID(mySocketID); + finalConnectionHandshake.setDestinationID(this.getDestination().getSocketID()); + finalConnectionHandshake.setSession(this); + finalConnectionHandshake.setCookie(sessionCookie); + finalConnectionHandshake.setAddress(endPoint.getLocalAddress()); + } + logger.info("Sending final handshake ack "+finalConnectionHandshake); + endPoint.doSend(finalConnectionHandshake); + } } Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-05-25 08:03:03 UTC (rev 74) @@ -40,6 +40,8 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; @@ -70,10 +72,12 @@ //last received packet private UDTPacket lastPacket; + private final Map<Destination,UDTSession> sessionsBeingConnected=Collections.synchronizedMap(new HashMap<Destination,UDTSession>()); + //if the endpoint is configured for a server socket, //this queue is used to handoff new UDTSessions to the application private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); - + private boolean serverSocketMode=false; //has the endpoint been stopped? @@ -90,7 +94,7 @@ this.dgSocket=socket; port=dgSocket.getLocalPort(); } - + /** * bind to any local port on the given host address * @param localAddress @@ -116,7 +120,7 @@ } if(localPort>0)this.port = localPort; else port=dgSocket.getLocalPort(); - + configureSocket(); } @@ -127,7 +131,7 @@ dgSocket.setReceiveBufferSize(128*1024); dgSocket.setReuseAddress(false); } - + /** * bind to the default network interface on the machine * @@ -240,79 +244,75 @@ * </ul> * @throws IOException */ - private long lastDestID=-1; - private UDTSession lastSession; - - private int n=0; - - private final Object lock=new Object(); - protected void doReceive()throws IOException{ while(!stopped){ try{ - try{ - - //will block until a packet is received or timeout has expired - dgSocket.receive(dp); - - Destination peer=new Destination(dp.getAddress(), dp.getPort()); - int l=dp.getLength(); - UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); - lastPacket=packet; + //will block until a packet is received or timeout has expired + dgSocket.receive(dp); - //handle connection handshake - if(packet.isConnectionHandshake()){ - synchronized(lock){ - Long id=Long.valueOf(packet.getDestinationID()); - UDTSession session=sessions.get(id); - if(session==null){ - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); - sessionHandoff.put(session); - logger.fine("Request taken for processing."); - } - } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); - } - } - else{ - //dispatch to existing session - long dest=packet.getDestinationID(); - UDTSession session; - if(dest==lastDestID){ - session=lastSession; - } - else{ - session=sessions.get(dest); - lastSession=session; - lastDestID=dest; - } - if(session==null){ - n++; - if(n%100==1){ - logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); - } - } - else{ - session.received(packet,peer); - } - } - }catch(SocketException ex){ - logger.log(Level.INFO, "SocketException: "+ex.getMessage()); - }catch(SocketTimeoutException ste){ - //can safely ignore... we will retry until the endpoint is stopped + Destination peer=new Destination(dp.getAddress(), dp.getPort()); + int l=dp.getLength(); + UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); + lastPacket=packet; + + long dest=packet.getDestinationID(); + UDTSession session=sessions.get(dest); + if(session!=null){ + //dispatch to existing session + session.received(packet,peer); } - + else if(packet.isConnectionHandshake()){ + connectionHandshake((ConnectionHandshake)packet, peer); + } + else{ + logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + } + }catch(SocketException ex){ + logger.log(Level.INFO, "SocketException: "+ex.getMessage()); + }catch(SocketTimeoutException ste){ + //can safely ignore... we will retry until the endpoint is stopped }catch(Exception ex){ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } } } + /** + * called when a "connection handshake" packet was received and no + * matching session yet exists + * + * @param packet + * @param peer + * @throws IOException + * @throws InterruptedException + */ + protected synchronized void connectionHandshake(ConnectionHandshake packet, Destination peer)throws IOException, InterruptedException{ + Destination p=new Destination(peer.getAddress(),peer.getPort()); + UDTSession session=sessionsBeingConnected.get(peer); + long destID=packet.getDestinationID(); + if(session!=null && session.getSocketID()==destID){ + //confirmation handshake + sessionsBeingConnected.remove(p); + addSession(destID, session); + } + else if(session==null){ + session=new ServerSession(peer,this); + sessionsBeingConnected.put(p,session); + sessions.put(session.getSocketID(), session); + if(serverSocketMode){ + logger.fine("Pooling new request."); + sessionHandoff.put(session); + logger.fine("Request taken for processing."); + } + } + else { + throw new IOException("dest ID sent by client does not match"); + } + Long peerSocketID=((ConnectionHandshake)packet).getSocketID(); + peer.setSocketID(peerSocketID); + session.received(packet,peer); + } + protected void doSend(UDTPacket packet)throws IOException{ byte[]data=packet.getEncoded(); DatagramPacket dgp = packet.getSession().getDatagram(); @@ -327,4 +327,5 @@ public void sendRaw(DatagramPacket p)throws IOException{ dgSocket.send(p); } + } Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2012-05-25 08:03:03 UTC (rev 74) @@ -80,12 +80,11 @@ //create client session... clientSession=new ClientSession(clientEndpoint,destination); clientEndpoint.addSession(clientSession.getSocketID(), clientSession); - clientEndpoint.start(); clientSession.connect(); //wait for handshake while(!clientSession.isReady()){ - Thread.sleep(5); + Thread.sleep(50); } logger.info("The UDTClient is connected"); } Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74) @@ -78,9 +78,19 @@ @Override public int read()throws IOException{ int b=0; - while(b==0) + while(b==0){ b=read(single); - + if(b==0){ + try{ + while(receiveBuffer.isEmpty()){ + Thread.sleep(20); + } + }catch(InterruptedException ie){ + throw new IOException(ie); + } + } + } + if(b>0){ return single[0] & 0xFF; } @@ -153,9 +163,7 @@ else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ - IOException ex=new IOException(); - ex.initCause(ie); - throw ex; + throw new IOException(ie); } return; } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-05-25 08:03:03 UTC (rev 74) @@ -196,9 +196,14 @@ //starts the sender algorithm private void start(){ + Runnable r=new Runnable(){ public void run(){ try{ + while(session.getSocket()==null)Thread.sleep(100); + session.getSocket().getInputStream(); + + logger.info("STARTING RECEIVER for "+session); nextACK=Util.getCurrentTime()+ackTimerInterval; nextNAK=(long)(Util.getCurrentTime()+1.5*nakTimerInterval); nextEXP=Util.getCurrentTime()+2*expTimerInterval; @@ -224,6 +229,9 @@ */ protected void receive(UDTPacket p)throws IOException{ if(storeStatistics)dgReceiveInterval.end(); + if(!p.isControlPacket()){ + System.out.println("++ "+p+" queuesize="+handoffQueue.size()); + } handoffQueue.offer(p); if(storeStatistics)dgReceiveInterval.begin(); } @@ -265,6 +273,7 @@ needEXPReset=true; } } + if(needEXPReset){ nextEXP=Util.getCurrentTime()+expTimerInterval; } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-05-25 08:03:03 UTC (rev 74) @@ -151,7 +151,7 @@ * start the sender thread */ public void start(){ - logger.info("Starting sender for "+session); + logger.info("STARTING SENDER for "+session); startLatch.countDown(); started=true; } Modified: udt-java/trunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-05-25 08:03:03 UTC (rev 74) @@ -31,6 +31,7 @@ *********************************************************************************/ package udt; + import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; @@ -38,8 +39,8 @@ import java.util.logging.Logger; - public class UDTServerSocket { + private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); private final UDPEndPoint endpoint; Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -39,6 +39,7 @@ import java.util.logging.Logger; import udt.packets.Destination; +import udt.util.SequenceNumber; import udt.util.UDTStatistics; public abstract class UDTSession { @@ -53,9 +54,9 @@ //state constants public static final int start=0; public static final int handshaking=1; - public static final int ready=2; - public static final int keepalive=3; - public static final int shutdown=4; + public static final int ready=50; + public static final int keepalive=80; + public static final int shutdown=90; public static final int invalid=99; @@ -70,6 +71,9 @@ //cache dgPacket (peer stays the same always) private DatagramPacket dgPacket; + //session cookie created during handshake + protected long sessionCookie=0; + /** * flow window size, i.e. how many data packets are * in-flight at a single time @@ -209,7 +213,7 @@ public synchronized long getInitialSequenceNumber(){ if(initialSequenceNumber==null){ - initialSequenceNumber=1l; //TODO must be random? + initialSequenceNumber=SequenceNumber.random(); } return initialSequenceNumber; } Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-05-25 08:03:03 UTC (rev 74) @@ -33,6 +33,8 @@ package udt.packets; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; import udt.UDTSession; @@ -49,21 +51,29 @@ private long packetSize; private long maxFlowWndSize; - public static final long CONNECTION_TYPE_REGULAR=1; + public static final long CONNECTION_TYPE_REGULAR=1L; - public static final long CONNECTION_TYPE_RENDEZVOUS=0; + public static final long CONNECTION_TYPE_RENDEZVOUS=0L; + /** + * connection type in response handshake packet + */ + public static final long CONNECTION_SERVER_ACK=-1L; + private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode private long socketID; private long cookie=0; + //address of the UDP socket + private InetAddress address; + public ConnectionHandshake(){ this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal(); } - public ConnectionHandshake(byte[]controlInformation){ + public ConnectionHandshake(byte[]controlInformation)throws IOException{ this(); decode(controlInformation); } @@ -73,7 +83,7 @@ return true; } - void decode(byte[]data){ + void decode(byte[]data)throws IOException{ udtVersion =PacketUtil.decode(data, 0); socketType=PacketUtil.decode(data, 4); initialSeqNo=PacketUtil.decode(data, 8); @@ -81,9 +91,9 @@ maxFlowWndSize=PacketUtil.decode(data, 16); connectionType=PacketUtil.decode(data, 20); socketID=PacketUtil.decode(data, 24); - if(data.length>28){ - cookie=PacketUtil.decode(data, 28); - } + cookie=PacketUtil.decode(data, 28); + //TODO ipv6 check + address=PacketUtil.decodeInetAddress(data, 32, false); } public long getUdtVersion() { @@ -134,11 +144,25 @@ public void setSocketID(long socketID) { this.socketID = socketID; } + public long getCookie() { + return cookie; + } + public void setCookie(long cookie) { + this.cookie = cookie; + } + public InetAddress getAddress() { + return address; + } + + public void setAddress(InetAddress address) { + this.address = address; + } + @Override public byte[] encodeControlInformation(){ try { - ByteArrayOutputStream bos=new ByteArrayOutputStream(24); + ByteArrayOutputStream bos=new ByteArrayOutputStream(48); bos.write(PacketUtil.encode(udtVersion)); bos.write(PacketUtil.encode(socketType)); bos.write(PacketUtil.encode(initialSeqNo)); @@ -146,6 +170,8 @@ bos.write(PacketUtil.encode(maxFlowWndSize)); bos.write(PacketUtil.encode(connectionType)); bos.write(PacketUtil.encode(socketID)); + bos.write(PacketUtil.encode(cookie)); + bos.write(PacketUtil.encode(address)); return bos.toByteArray(); } catch (Exception e) { // can't happen @@ -178,6 +204,10 @@ return false; if (udtVersion != other.udtVersion) return false; + if (cookie!=other.cookie) + return false; + if (!address.equals(other.address)) + return false; return true; } @@ -198,6 +228,7 @@ sb.append(", socketType=").append(socketType); sb.append(", destSocketID=").append(destinationID); if(cookie>0)sb.append(", cookie=").append(cookie); + sb.append(", address=").append(address); sb.append("]"); return sb.toString(); } Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-05-25 08:03:03 UTC (rev 74) @@ -31,6 +31,8 @@ *********************************************************************************/ package udt.packets; +import java.io.IOException; + import udt.UDTPacket; import udt.packets.ControlPacket.*; @@ -42,13 +44,13 @@ * @param packetData * @return */ - public static UDTPacket createPacket(byte[]encodedData){ + public static UDTPacket createPacket(byte[]encodedData)throws IOException{ boolean isControl=(encodedData[0]&128) !=0 ; if(isControl)return createControlPacket(encodedData,encodedData.length); return new DataPacket(encodedData); } - public static UDTPacket createPacket(byte[]encodedData,int length){ + public static UDTPacket createPacket(byte[]encodedData,int length)throws IOException{ boolean isControl=(encodedData[0]&128) !=0 ; if(isControl)return createControlPacket(encodedData,length); return new DataPacket(encodedData,length); @@ -59,7 +61,7 @@ * @param packetData * @return */ - public static ControlPacket createControlPacket(byte[]encodedData,int length){ + public static ControlPacket createControlPacket(byte[]encodedData,int length)throws IOException{ ControlPacket packet=null; Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-05-25 08:03:03 UTC (rev 74) @@ -32,7 +32,10 @@ package udt.packets; +import java.net.InetAddress; +import java.net.UnknownHostException; + public class PacketUtil { public static byte[]encode(long value){ @@ -80,4 +83,30 @@ return result; } + /** + * encodes the specified address into 128 bit + * @param address - inet address + */ + public static byte[] encode(InetAddress address){ + byte[]res=new byte[16]; + byte[]add=address.getAddress(); + System.arraycopy(add, 0, res, 0, add.length); + return res; + } + + public static InetAddress decodeInetAddress(byte[]data, int start, boolean ipV6)throws UnknownHostException{ + InetAddress result=null; + byte[] add=ipV6?new byte[16]:new byte[4]; + System.arraycopy(data, start, add, 0, add.length); + result=InetAddress.getByAddress(add); + return result; + } + + public static void print(byte[]arr){ + System.out.print("["); + for(byte b: arr){ + System.out.print(" "+(b&0xFF)); + } + System.out.println(" ]"); + } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-05-25 08:03:03 UTC (rev 74) @@ -54,7 +54,9 @@ try{ long seq=data.getSequenceNumber(); //if already have this chunk, discard it - if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true; + if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0){ + return true; + } //else compute insert position int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq); int insert=offset% size; Modified: udt-java/trunk/src/main/java/udt/util/SequenceNumber.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-05-25 08:03:03 UTC (rev 74) @@ -13,7 +13,7 @@ private final static long maxSequenceNo=0x7FFFFFFF; - + private final static Random rand=new Random(); /** * compare seq1 and seq2. Returns zero, if they are equal, a negative value if seq1 is smaller than * seq2, and a positive value if seq1 is larger than seq2. @@ -67,7 +67,7 @@ * generates a random number between 1 and 0x3FFFFFFF (inclusive) */ public static long random(){ - return 1+new Random().nextInt(maxOffset); + return 1+rand.nextInt(maxOffset); } } Modified: udt-java/trunk/src/test/java/echo/EchoServer.java =================================================================== --- udt-java/trunk/src/test/java/echo/EchoServer.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/echo/EchoServer.java 2012-05-25 08:03:03 UTC (rev 74) @@ -78,7 +78,7 @@ String line=readLine(in); if(line!=null){ System.out.println("ECHO: "+line); - //else echo back the line + //echo back the line writer.println(line); writer.flush(); } Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74) @@ -101,7 +101,6 @@ try{ for(int i=0;i<blocks.length;i++){ while(!is.haveNewData(i+1, blocks[i])){ - Thread.yield(); Thread.sleep(100); } } Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-05-25 08:03:03 UTC (rev 74) @@ -3,17 +3,20 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import org.junit.Test; import udt.UDTPacket; +import udt.util.SequenceNumber; public class TestPacketFactory { @Test - public void testData(){ + public void testData()throws IOException{ String test="sdjfsdjfldskjflds"; byte[]data=test.getBytes(); @@ -26,7 +29,7 @@ } @Test - public void testConnectionHandshake(){ + public void testConnectionHandshake()throws IOException{ ConnectionHandshake p1 = new ConnectionHandshake(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); @@ -39,8 +42,9 @@ p1.setMaxFlowWndSize(128); p1.setSocketID(1); p1.setUdtVersion(4); - - + p1.setAddress(InetAddress.getLocalHost()); + p1.setCookie(SequenceNumber.random()); + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); @@ -50,7 +54,7 @@ } @Test - public void testAcknowledgement(){ + public void testAcknowledgement()throws IOException{ Acknowledgement p1 = new Acknowledgement(); p1.setAckSequenceNumber(1234); p1.setMessageNumber(9876); @@ -70,7 +74,7 @@ } @Test - public void testAcknowledgementOfAcknowledgement(){ + public void testAcknowledgementOfAcknowledgement()throws IOException{ Acknowledgment2 p1 = new Acknowledgment2(); p1.setAckSequenceNumber(1230); p1.setMessageNumber(9871); @@ -86,7 +90,7 @@ } @Test - public void testNegativeAcknowledgement(){ + public void testNegativeAcknowledgement()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -105,7 +109,7 @@ } @Test - public void testNegativeAcknowledgement2(){ + public void testNegativeAcknowledgement2()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -130,7 +134,7 @@ } @Test - public void testNegativeAcknowledgement3(){ + public void testNegativeAcknowledgement3()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -148,13 +152,12 @@ } @Test - public void testShutdown(){ + public void testShutdown()throws IOException{ Shutdown p1 = new Shutdown(); p1.setMessageNumber(9874); p1.setTimeStamp(3453); p1.setDestinationID(3); - byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); @@ -164,7 +167,7 @@ @Test - public void testMessageDropRequest(){ + public void testMessageDropRequest()throws Exception{ MessageDropRequest p1=new MessageDropRequest(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); @@ -181,5 +184,15 @@ MessageDropRequest p2=(MessageDropRequest)p; assertEquals(p1,p2); } + + @Test + public void testPacketUtil()throws Exception{ + InetAddress i=InetAddress.getLocalHost(); + byte[]enc=PacketUtil.encode(i); + PacketUtil.print(enc); + InetAddress i2=PacketUtil.decodeInetAddress(enc, 0, false); + System.out.println(i2); + assertEquals(i, i2); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |