[Udt-java-commits] SF.net SVN: udt-java:[23] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-04-22 10:07:00
|
Revision: 23
http://udt-java.svn.sourceforge.net/udt-java/?rev=23&view=rev
Author: bschuller
Date: 2010-04-22 10:06:54 +0000 (Thu, 22 Apr 2010)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/README
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTPacket.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/packets/ControlPacket.java
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java
udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java
udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/util/SendFile.java
udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
udt-java/trunk/src/main/java/udt/util/Util.java
udt-java/trunk/src/test/java/udt/TestList.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Modified: udt-java/trunk/README
===================================================================
--- udt-java/trunk/README 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/README 2010-04-22 10:06:54 UTC (rev 23)
@@ -17,7 +17,7 @@
To download a file from the server,
- bin/receive_file <server_host> <server_port> <remote_filename> <local_filename>
+ bin/receive-file <server_host> <server_port> <remote_filename> <local_filename>
#
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -21,7 +21,7 @@
private final UDTStatistics statistics;
//round trip time in microseconds
- private long roundTripTime=2*Util.getSYNTime();
+ private long roundTripTime=0;
//rate in packets per second
private long packetArrivalRate=0;
@@ -124,7 +124,6 @@
if(slowStartPhase){
congestionWindowSize+=ackSeqno-lastAckSeqNumber;
lastAckSeqNumber = ackSeqno;
-
//but not beyond a maximum size
if(congestionWindowSize>session.getFlowWindowSize()){
System.out.println("slow start ends on ACK");
@@ -163,7 +162,6 @@
double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD());
packetSendingPeriod=factor*packetSendingPeriod;
//packetSendingPeriod=0.995*packetSendingPeriod;
- //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod);
statistics.setSendPeriod(packetSendingPeriod);
}
@@ -173,7 +171,7 @@
//see spec page 16
private double computeNumOfIncreasingPacket (){
- //difference in link capacity and sending speed, in packets per second
+ //difference between link capacity and sending speed, in packets per second
double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod;
if(remaining<=0){
Modified: udt-java/trunk/src/main/java/udt/UDTPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -32,7 +32,7 @@
package udt;
-public interface UDTPacket {
+public interface UDTPacket extends Comparable<UDTPacket>{
public long getMessageNumber();
@@ -67,4 +67,7 @@
public boolean isConnectionHandshake();
public UDTSession getSession();
+
+ public long getPacketSequenceNumber();
+
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -33,8 +33,8 @@
package udt;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -110,7 +110,7 @@
long packetArrivalSpeed;
//round trip time, calculated from ACK/ACK2 pairs
- long roundTripTime=50*1000;
+ long roundTripTime=0;
//round trip time variance
long roundTripTimeVar=roundTripTime/2;
@@ -135,8 +135,8 @@
//buffer size for storing data
private final long bufferSize;
- //stores packets to be sent
- private final BlockingQueue<UDTPacket>handoffQueue=new ArrayBlockingQueue<UDTPacket>(32);
+ //stores received packets to be sent
+ private final BlockingQueue<UDTPacket>handoffQueue;
private Thread receiverThread;
@@ -162,10 +162,14 @@
packetHistoryWindow = new PacketHistoryWindow(16);
receiverLossList = new ReceiverLossList();
packetPairWindow = new PacketPairWindow(16);
- nextACK=Util.getCurrentTime()+ACK_INTERVAL;
- nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL);
- nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL;
+ largestReceivedSeqNumber=session.getInitialSequenceNumber()-1;
bufferSize=session.getReceiveBufferSize();
+
+ //incoming packets are ordered by sequence number, with control packets having
+ //preference over data packets
+ handoffQueue=//new ArrayBlockingQueue<UDTPacket>(session.getFlowWindowSize());
+ new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize());
+
start();
}
@@ -174,6 +178,9 @@
Runnable r=new Runnable(){
public void run(){
try{
+ nextACK=Util.getCurrentTime()+ACK_INTERVAL;
+ nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL);
+ nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL;
while(!stopped){
receiverAlgorithm();
}
@@ -191,6 +198,8 @@
/*
* packets are written by the endpoint
*/
+ long i=0;
+ long mean=0;
protected void receive(UDTPacket p)throws IOException{
handoffQueue.offer(p);
}
@@ -416,7 +425,6 @@
protected void sendNAK(List<Long>sequenceNumbers)throws IOException{
if(sequenceNumbers.size()==0)return;
NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement();
-
nAckPacket.addLossInfo(sequenceNumbers);
nAckPacket.setSession(session);
nAckPacket.setDestinationID(session.getDestination().getSocketID());
@@ -486,7 +494,6 @@
if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8;
else roundTripTime = rtt;
roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4;
-
ACK_INTERVAL=4*roundTripTime+roundTripTimeVar+Util.getSYNTime();
NAK_INTERVAL=ACK_INTERVAL;
statistics.setRTT(roundTripTime, roundTripTimeVar);
Modified: udt-java/trunk/src/main/java/udt/packets/ControlPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -191,6 +191,14 @@
this.session = session;
}
+ public long getPacketSequenceNumber(){
+ return -1;
+ }
+
+ public int compareTo(UDTPacket other){
+ return (int)(getPacketSequenceNumber()-other.getPacketSequenceNumber());
+ }
+
public static enum ControlPacketType {
CONNECTION_HANDSHAKE,
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -38,7 +38,7 @@
import udt.UDTPacket;
import udt.UDTSession;
-public class DataPacket implements UDTPacket, Comparable<DataPacket>{
+public class DataPacket implements UDTPacket, Comparable<UDTPacket>{
private byte[] data ;
private long packetSequenceNumber;
@@ -178,9 +178,7 @@
this.session = session;
}
- //Compare data packets by their sequence number
- public int compareTo(DataPacket other){
- return (int)(other.packetSequenceNumber-packetSequenceNumber);
+ public int compareTo(UDTPacket other){
+ return (int)(getPacketSequenceNumber()-other.getPacketSequenceNumber());
}
-
}
Modified: udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/receiver/AckHistoryWindow.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -36,8 +36,6 @@
/**
* a circular array of each sent Ack and the time it is sent out
- *
- *
*/
public class AckHistoryWindow extends CircularArray<AckHistoryEntry>{
@@ -47,20 +45,20 @@
/**
* return the time for the given seq no, or <code>-1 </code> if not known
- * @param seqNo
+ * @param ackNumber
*/
- public long getTime(long seqNo){
+ public long getTime(long ackNumber){
for(AckHistoryEntry obj: circularArray){
- if(obj.getAckNumber()==seqNo){
+ if(obj.getAckNumber()==ackNumber){
return obj.getSentTime();
}
}
return -1;
}
- public AckHistoryEntry getEntry(long seqNo){
+ public AckHistoryEntry getEntry(long ackNumber){
for(AckHistoryEntry obj: circularArray){
- if(obj.getAckNumber()==seqNo){
+ if(obj.getAckNumber()==ackNumber){
return obj;
}
}
Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -49,7 +49,7 @@
private final PriorityBlockingQueue<ReceiverLossListEntry>backingList;
public ReceiverLossList(){
- backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(16);
+ backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(32);
}
public void insert(ReceiverLossListEntry entry){
@@ -94,11 +94,10 @@
*/
public List<Long>getFilteredSequenceNumbers(long RTT, boolean doFeedback){
List<Long>result=new ArrayList<Long>();
- long now=Util.getCurrentTime();
ReceiverLossListEntry[]sorted=backingList.toArray(new ReceiverLossListEntry[0]);
Arrays.sort(sorted);
for(ReceiverLossListEntry e: sorted){
- if( (now-e.getLastFeedbackTime())>e.getK()*RTT){
+ if( (Util.getCurrentTime()-e.getLastFeedbackTime())>e.getK()*RTT){
result.add(e.getSequenceNumber());
if(doFeedback)e.feedback();
}
Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -39,7 +39,7 @@
*/
public class ReceiverLossListEntry implements Comparable<ReceiverLossListEntry> {
- private final long sequenceNumber ;
+ private final long sequenceNumber;
private long lastFeedbacktime;
private long k = 2;
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -44,7 +44,7 @@
* create a new sender lost list
*/
public SenderLossList(){
- backingList = new PriorityBlockingQueue<SenderLossListEntry>(16);
+ backingList = new PriorityBlockingQueue<SenderLossListEntry>(32);
}
public void insert(SenderLossListEntry obj){
Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -36,6 +36,7 @@
import java.io.FileInputStream;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.text.NumberFormat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
@@ -60,9 +61,10 @@
//TODO configure pool size
private final ExecutorService threadPool=Executors.newFixedThreadPool(3);
-
+
public SendFile(int serverPort){
this.serverPort=serverPort;
+
}
@Override
@@ -115,8 +117,11 @@
private final UDTSocket socket;
+ private final NumberFormat format=NumberFormat.getNumberInstance();
+
public RequestRunner(UDTSocket socket){
this.socket=socket;
+ format.setMaximumFractionDigits(3);
}
public void run(){
@@ -149,7 +154,8 @@
Util.copy(fis, out, size, true);
long end=System.currentTimeMillis();
System.out.println(socket.getSession().getStatistics().toString());
- System.out.println("[SendFile] Rate: "+1000*size/1024/1024/(end-start)+" MBytes/sec.");
+ double rate=1000.0*size/1024/1024/(end-start);
+ System.out.println("[SendFile] Rate: "+format.format(rate)+" MBytes/sec. "+format.format(8*rate)+" MBit/sec.");
socket.getSession().getStatistics().writeParameterHistory(new File("udtstats-"+System.currentTimeMillis()+".csv"));
}finally{
fis.close();
Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -230,5 +230,5 @@
}
return hexString.toString();
}
-
+
}
Modified: udt-java/trunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -149,5 +149,5 @@
p.setPort(clientPort);
endpoint.sendRaw(p);
}
-
+
}
Modified: udt-java/trunk/src/test/java/udt/TestList.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestList.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/test/java/udt/TestList.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -1,5 +1,10 @@
package udt;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
import junit.framework.TestCase;
+import udt.packets.DataPacket;
+import udt.packets.KeepAlive;
import udt.receiver.AckHistoryEntry;
import udt.receiver.AckHistoryWindow;
import udt.receiver.PacketHistoryWindow;
@@ -58,33 +63,23 @@
for(int i=0;i<values.length;i++){
p.add(values[i]);
}
- //assertEquals(10.0d, p.computeMedianTimeInterval());
+ assertEquals(4.0d, p.computeMedianTimeInterval());
- System.out.println(p.toString());
- System.out.println("MedianTimeInterval: "+p.computeMedianTimeInterval());
+ long[] arrivaltimes = {12, 12, 12, 12};
+ PacketPairWindow p1=new PacketPairWindow(16);
+ for(int i=0;i<values.length;i++){
+ p1.add(arrivaltimes[i]);
+ }
+ assertEquals(12.0d, p1.computeMedianTimeInterval());
- System.out.println(p.toString());
- System.out.println("MedianTimeInterval: "+p.computeMedianTimeInterval());
-
- //assertEquals(10.0d, p.);
-
- //long[] arrivaltimes = {12, 12, 12, 12};
- //PacketPairWindow p1=new PacketPairWindow(16);
- //for(int i=0;i<values.length;i++){
- // p1.insert(arrivaltimes[i]);
- //}
- //assertEquals(12.0d, p1.computeMedianTimeInterval());
-
}
public void testAckHistoryWindow(){
- AckHistoryEntry ackSeqNrA = new AckHistoryEntry( 0,1,1263465050);
-
+ AckHistoryEntry ackSeqNrA = new AckHistoryEntry(0,1,1263465050);
AckHistoryEntry ackSeqNrB = new AckHistoryEntry(1,2,1263465054);
-
AckHistoryEntry ackSeqNrC = new AckHistoryEntry(2,3,1263465058);
AckHistoryWindow recvWindow = new AckHistoryWindow(3);
@@ -92,10 +87,7 @@
recvWindow.add(ackSeqNrB);
recvWindow.add(ackSeqNrC);
AckHistoryEntry entryA = recvWindow.getEntry(1);
- long storageTimeA = entryA.getSentTime();
- long storageTimeA_ =recvWindow.getTime(1);
- System.out.println("storageTimeA bzw A_ "+storageTimeA+" "+storageTimeA_);
-
+ assertEquals(1263465050, entryA.getSentTime());
}
public void testSenderLossList1(){
@@ -110,4 +102,30 @@
assertEquals(C,oldest);
}
+ public void testReceiverInputQueue(){
+ BlockingQueue<UDTPacket> q=new PriorityBlockingQueue<UDTPacket>(5);
+ UDTPacket control = new KeepAlive();
+ DataPacket d1=new DataPacket();
+ d1.setPacketSequenceNumber(1);
+ DataPacket d2=new DataPacket();
+ d2.setPacketSequenceNumber(2);
+ q.offer(d2);
+ q.offer(d1);
+ q.offer(control);
+
+ UDTPacket p1=q.poll();
+ assertTrue(p1.isControlPacket());
+
+ UDTPacket p2=q.poll();
+ assertFalse(p2.isControlPacket());
+ //check ordering by sequence number
+ assertEquals(1,p2.getPacketSequenceNumber());
+
+ UDTPacket p3=q.poll();
+ assertFalse(p3.isControlPacket());
+ assertEquals(2,p3.getPacketSequenceNumber());
+
+
+ }
+
}
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 20:36:36 UTC (rev 22)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-22 10:06:54 UTC (rev 23)
@@ -3,6 +3,7 @@
import java.io.File;
import java.net.InetAddress;
import java.security.MessageDigest;
+import java.text.NumberFormat;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -20,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=100;
+ int num_packets=200;
//how large is a single packet
int size=1*1024*1024;
@@ -38,7 +39,11 @@
doTest();
}
+ private final NumberFormat format=NumberFormat.getNumberInstance();
+
protected void doTest()throws Exception{
+ format.setMaximumFractionDigits(2);
+
if(!running)runServer();
UDTClient client=new UDTClient(InetAddress.getByName("localhost"),12345);
client.connect("localhost", 65321);
@@ -58,7 +63,8 @@
long block=System.currentTimeMillis();
client.sendBlocking(data);
digest.update(data);
- System.out.println("Sent block <"+i+"> in "+(System.currentTimeMillis()-block)+" ms");
+ double took=System.currentTimeMillis()-block;
+ System.out.println("Sent block <"+i+"> in "+took+" ms, rate: "+format.format(size/(1024*took))+ " Mbytes/sec");
}
end=System.currentTimeMillis();
client.shutdown();
@@ -70,7 +76,7 @@
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
double mbytes=N/(end-start)/1024;
double mbit=8*mbytes;
- System.out.println("Rate: "+(int)mbytes+" Mbytes/sec "+(int)mbit+" Mbit/sec");
+ System.out.println("Rate: "+format.format(mbytes)+" Mbytes/sec "+format.format(mbit)+" Mbit/sec");
System.out.println("Server received: "+total);
assertEquals(N,total);
@@ -78,7 +84,7 @@
System.out.println("MD5 hash of data received: "+md5_received);
System.out.println(client.getStatistics());
- //assertEquals(md5_sent,md5_received);
+ assertEquals(md5_sent,md5_received);
//store stat history to csv file
client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv"));
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|