[Udt-java-commits] SF.net SVN: udt-java:[23] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-22 10:07:00
|
Revision: 23 http://udt-java.svn.sourceforge.net/udt-java/?rev=23&view=rev Author: bschuller Date: 2010-04-22 10:06:54 +0000 (Thu, 22 Apr 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/README udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTPacket.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/packets/ControlPacket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/TestList.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/README =================================================================== --- udt-java/trunk/README 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/README 2010-04-22 10:06:54 UTC (rev 23) @@ -17,7 +17,7 @@ To download a file from the server, - bin/receive_file <server_host> <server_port> <remote_filename> <local_filename> + bin/receive-file <server_host> <server_port> <remote_filename> <local_filename> # Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-22 10:06:54 UTC (rev 23) @@ -21,7 +21,7 @@ private final UDTStatistics statistics; //round trip time in microseconds - private long roundTripTime=2*Util.getSYNTime(); + private long roundTripTime=0; //rate in packets per second private long packetArrivalRate=0; @@ -124,7 +124,6 @@ if(slowStartPhase){ congestionWindowSize+=ackSeqno-lastAckSeqNumber; lastAckSeqNumber = ackSeqno; - //but not beyond a maximum size if(congestionWindowSize>session.getFlowWindowSize()){ System.out.println("slow start ends on ACK"); @@ -163,7 +162,6 @@ double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); packetSendingPeriod=factor*packetSendingPeriod; //packetSendingPeriod=0.995*packetSendingPeriod; - //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod); statistics.setSendPeriod(packetSendingPeriod); } @@ -173,7 +171,7 @@ //see spec page 16 private double computeNumOfIncreasingPacket (){ - //difference in link capacity and sending speed, in packets per second + //difference between link capacity and sending speed, in packets per second double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod; if(remaining<=0){ Modified: udt-java/trunk/src/main/java/udt/UDTPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-22 10:06:54 UTC (rev 23) @@ -32,7 +32,7 @@ package udt; -public interface UDTPacket { +public interface UDTPacket extends Comparable<UDTPacket>{ public long getMessageNumber(); @@ -67,4 +67,7 @@ public boolean isConnectionHandshake(); public UDTSession getSession(); + + public long getPacketSequenceNumber(); + } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-22 10:06:54 UTC (rev 23) @@ -33,8 +33,8 @@ package udt; import java.io.IOException; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -110,7 +110,7 @@ long packetArrivalSpeed; //round trip time, calculated from ACK/ACK2 pairs - long roundTripTime=50*1000; + long roundTripTime=0; //round trip time variance long roundTripTimeVar=roundTripTime/2; @@ -135,8 +135,8 @@ //buffer size for storing data private final long bufferSize; - //stores packets to be sent - private final BlockingQueue<UDTPacket>handoffQueue=new ArrayBlockingQueue<UDTPacket>(32); + //stores received packets to be sent + private final BlockingQueue<UDTPacket>handoffQueue; private Thread receiverThread; @@ -162,10 +162,14 @@ packetHistoryWindow = new PacketHistoryWindow(16); receiverLossList = new ReceiverLossList(); packetPairWindow = new PacketPairWindow(16); - nextACK=Util.getCurrentTime()+ACK_INTERVAL; - nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL); - nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL; + largestReceivedSeqNumber=session.getInitialSequenceNumber()-1; bufferSize=session.getReceiveBufferSize(); + + //incoming packets are ordered by sequence number, with control packets having + //preference over data packets + handoffQueue=//new ArrayBlockingQueue<UDTPacket>(session.getFlowWindowSize()); + new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); + start(); } @@ -174,6 +178,9 @@ Runnable r=new Runnable(){ public void run(){ try{ + nextACK=Util.getCurrentTime()+ACK_INTERVAL; + nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL); + nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL; while(!stopped){ receiverAlgorithm(); } @@ -191,6 +198,8 @@ /* * packets are written by the endpoint */ + long i=0; + long mean=0; protected void receive(UDTPacket p)throws IOException{ handoffQueue.offer(p); } @@ -416,7 +425,6 @@ protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ if(sequenceNumbers.size()==0)return; NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement(); - nAckPacket.addLossInfo(sequenceNumbers); nAckPacket.setSession(session); nAckPacket.setDestinationID(session.getDestination().getSocketID()); @@ -486,7 +494,6 @@ if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8; else roundTripTime = rtt; roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4; - ACK_INTERVAL=4*roundTripTime+roundTripTimeVar+Util.getSYNTime(); NAK_INTERVAL=ACK_INTERVAL; statistics.setRTT(roundTripTime, roundTripTimeVar); Modified: udt-java/trunk/src/main/java/udt/packets/ControlPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-22 10:06:54 UTC (rev 23) @@ -191,6 +191,14 @@ this.session = session; } + public long getPacketSequenceNumber(){ + return -1; + } + + public int compareTo(UDTPacket other){ + return (int)(getPacketSequenceNumber()-other.getPacketSequenceNumber()); + } + public static enum ControlPacketType { CONNECTION_HANDSHAKE, Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-22 10:06:54 UTC (rev 23) @@ -38,7 +38,7 @@ import udt.UDTPacket; import udt.UDTSession; -public class DataPacket implements UDTPacket, Comparable<DataPacket>{ +public class DataPacket implements UDTPacket, Comparable<UDTPacket>{ private byte[] data ; private long packetSequenceNumber; @@ -178,9 +178,7 @@ this.session = session; } - //Compare data packets by their sequence number - public int compareTo(DataPacket other){ - return (int)(other.packetSequenceNumber-packetSequenceNumber); + public int compareTo(UDTPacket other){ + return (int)(getPacketSequenceNumber()-other.getPacketSequenceNumber()); } - } Modified: udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java 2010-04-22 10:06:54 UTC (rev 23) @@ -36,8 +36,6 @@ /** * a circular array of each sent Ack and the time it is sent out - * - * */ public class AckHistoryWindow extends CircularArray<AckHistoryEntry>{ @@ -47,20 +45,20 @@ /** * return the time for the given seq no, or <code>-1 </code> if not known - * @param seqNo + * @param ackNumber */ - public long getTime(long seqNo){ + public long getTime(long ackNumber){ for(AckHistoryEntry obj: circularArray){ - if(obj.getAckNumber()==seqNo){ + if(obj.getAckNumber()==ackNumber){ return obj.getSentTime(); } } return -1; } - public AckHistoryEntry getEntry(long seqNo){ + public AckHistoryEntry getEntry(long ackNumber){ for(AckHistoryEntry obj: circularArray){ - if(obj.getAckNumber()==seqNo){ + if(obj.getAckNumber()==ackNumber){ return obj; } } Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-22 10:06:54 UTC (rev 23) @@ -49,7 +49,7 @@ private final PriorityBlockingQueue<ReceiverLossListEntry>backingList; public ReceiverLossList(){ - backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(16); + backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(32); } public void insert(ReceiverLossListEntry entry){ @@ -94,11 +94,10 @@ */ public List<Long>getFilteredSequenceNumbers(long RTT, boolean doFeedback){ List<Long>result=new ArrayList<Long>(); - long now=Util.getCurrentTime(); ReceiverLossListEntry[]sorted=backingList.toArray(new ReceiverLossListEntry[0]); Arrays.sort(sorted); for(ReceiverLossListEntry e: sorted){ - if( (now-e.getLastFeedbackTime())>e.getK()*RTT){ + if( (Util.getCurrentTime()-e.getLastFeedbackTime())>e.getK()*RTT){ result.add(e.getSequenceNumber()); if(doFeedback)e.feedback(); } Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-22 10:06:54 UTC (rev 23) @@ -39,7 +39,7 @@ */ public class ReceiverLossListEntry implements Comparable<ReceiverLossListEntry> { - private final long sequenceNumber ; + private final long sequenceNumber; private long lastFeedbacktime; private long k = 2; Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-22 10:06:54 UTC (rev 23) @@ -44,7 +44,7 @@ * create a new sender lost list */ public SenderLossList(){ - backingList = new PriorityBlockingQueue<SenderLossListEntry>(16); + backingList = new PriorityBlockingQueue<SenderLossListEntry>(32); } public void insert(SenderLossListEntry obj){ Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-22 10:06:54 UTC (rev 23) @@ -36,6 +36,7 @@ import java.io.FileInputStream; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.text.NumberFormat; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -60,9 +61,10 @@ //TODO configure pool size private final ExecutorService threadPool=Executors.newFixedThreadPool(3); - + public SendFile(int serverPort){ this.serverPort=serverPort; + } @Override @@ -115,8 +117,11 @@ private final UDTSocket socket; + private final NumberFormat format=NumberFormat.getNumberInstance(); + public RequestRunner(UDTSocket socket){ this.socket=socket; + format.setMaximumFractionDigits(3); } public void run(){ @@ -149,7 +154,8 @@ Util.copy(fis, out, size, true); long end=System.currentTimeMillis(); System.out.println(socket.getSession().getStatistics().toString()); - System.out.println("[SendFile] Rate: "+1000*size/1024/1024/(end-start)+" MBytes/sec."); + double rate=1000.0*size/1024/1024/(end-start); + System.out.println("[SendFile] Rate: "+format.format(rate)+" MBytes/sec. "+format.format(8*rate)+" MBit/sec."); socket.getSession().getStatistics().writeParameterHistory(new File("udtstats-"+System.currentTimeMillis()+".csv")); }finally{ fis.close(); Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-22 10:06:54 UTC (rev 23) @@ -230,5 +230,5 @@ } return hexString.toString(); } - + } Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-22 10:06:54 UTC (rev 23) @@ -149,5 +149,5 @@ p.setPort(clientPort); endpoint.sendRaw(p); } - + } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/test/java/udt/TestList.java 2010-04-22 10:06:54 UTC (rev 23) @@ -1,5 +1,10 @@ package udt; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + import junit.framework.TestCase; +import udt.packets.DataPacket; +import udt.packets.KeepAlive; import udt.receiver.AckHistoryEntry; import udt.receiver.AckHistoryWindow; import udt.receiver.PacketHistoryWindow; @@ -58,33 +63,23 @@ for(int i=0;i<values.length;i++){ p.add(values[i]); } - //assertEquals(10.0d, p.computeMedianTimeInterval()); + assertEquals(4.0d, p.computeMedianTimeInterval()); - System.out.println(p.toString()); - System.out.println("MedianTimeInterval: "+p.computeMedianTimeInterval()); + long[] arrivaltimes = {12, 12, 12, 12}; + PacketPairWindow p1=new PacketPairWindow(16); + for(int i=0;i<values.length;i++){ + p1.add(arrivaltimes[i]); + } + assertEquals(12.0d, p1.computeMedianTimeInterval()); - System.out.println(p.toString()); - System.out.println("MedianTimeInterval: "+p.computeMedianTimeInterval()); - - //assertEquals(10.0d, p.); - - //long[] arrivaltimes = {12, 12, 12, 12}; - //PacketPairWindow p1=new PacketPairWindow(16); - //for(int i=0;i<values.length;i++){ - // p1.insert(arrivaltimes[i]); - //} - //assertEquals(12.0d, p1.computeMedianTimeInterval()); - } public void testAckHistoryWindow(){ - AckHistoryEntry ackSeqNrA = new AckHistoryEntry( 0,1,1263465050); - + AckHistoryEntry ackSeqNrA = new AckHistoryEntry(0,1,1263465050); AckHistoryEntry ackSeqNrB = new AckHistoryEntry(1,2,1263465054); - AckHistoryEntry ackSeqNrC = new AckHistoryEntry(2,3,1263465058); AckHistoryWindow recvWindow = new AckHistoryWindow(3); @@ -92,10 +87,7 @@ recvWindow.add(ackSeqNrB); recvWindow.add(ackSeqNrC); AckHistoryEntry entryA = recvWindow.getEntry(1); - long storageTimeA = entryA.getSentTime(); - long storageTimeA_ =recvWindow.getTime(1); - System.out.println("storageTimeA bzw A_ "+storageTimeA+" "+storageTimeA_); - + assertEquals(1263465050, entryA.getSentTime()); } public void testSenderLossList1(){ @@ -110,4 +102,30 @@ assertEquals(C,oldest); } + public void testReceiverInputQueue(){ + BlockingQueue<UDTPacket> q=new PriorityBlockingQueue<UDTPacket>(5); + UDTPacket control = new KeepAlive(); + DataPacket d1=new DataPacket(); + d1.setPacketSequenceNumber(1); + DataPacket d2=new DataPacket(); + d2.setPacketSequenceNumber(2); + q.offer(d2); + q.offer(d1); + q.offer(control); + + UDTPacket p1=q.poll(); + assertTrue(p1.isControlPacket()); + + UDTPacket p2=q.poll(); + assertFalse(p2.isControlPacket()); + //check ordering by sequence number + assertEquals(1,p2.getPacketSequenceNumber()); + + UDTPacket p3=q.poll(); + assertFalse(p3.isControlPacket()); + assertEquals(2,p3.getPacketSequenceNumber()); + + + } + } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 20:36:36 UTC (rev 22) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-22 10:06:54 UTC (rev 23) @@ -3,6 +3,7 @@ import java.io.File; import java.net.InetAddress; import java.security.MessageDigest; +import java.text.NumberFormat; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; @@ -20,7 +21,7 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=200; //how large is a single packet int size=1*1024*1024; @@ -38,7 +39,11 @@ doTest(); } + private final NumberFormat format=NumberFormat.getNumberInstance(); + protected void doTest()throws Exception{ + format.setMaximumFractionDigits(2); + if(!running)runServer(); UDTClient client=new UDTClient(InetAddress.getByName("localhost"),12345); client.connect("localhost", 65321); @@ -58,7 +63,8 @@ long block=System.currentTimeMillis(); client.sendBlocking(data); digest.update(data); - System.out.println("Sent block <"+i+"> in "+(System.currentTimeMillis()-block)+" ms"); + double took=System.currentTimeMillis()-block; + System.out.println("Sent block <"+i+"> in "+took+" ms, rate: "+format.format(size/(1024*took))+ " Mbytes/sec"); } end=System.currentTimeMillis(); client.shutdown(); @@ -70,7 +76,7 @@ System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); double mbytes=N/(end-start)/1024; double mbit=8*mbytes; - System.out.println("Rate: "+(int)mbytes+" Mbytes/sec "+(int)mbit+" Mbit/sec"); + System.out.println("Rate: "+format.format(mbytes)+" Mbytes/sec "+format.format(mbit)+" Mbit/sec"); System.out.println("Server received: "+total); assertEquals(N,total); @@ -78,7 +84,7 @@ System.out.println("MD5 hash of data received: "+md5_received); System.out.println(client.getStatistics()); - //assertEquals(md5_sent,md5_received); + assertEquals(md5_sent,md5_received); //store stat history to csv file client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv")); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |