[Udt-java-commits] SF.net SVN: udt-java:[21] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-04-21 15:45:06
|
Revision: 21
http://udt-java.svn.sourceforge.net/udt-java/?rev=21&view=rev
Author: bschuller
Date: 2010-04-21 15:45:00 +0000 (Wed, 21 Apr 2010)
Log Message:
-----------
rate control stuff --- still not so great IMO
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/ClientSession.java
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Modified: udt-java/trunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -107,8 +107,6 @@
}
active = true;
try{
- //packet received means we should not yet expire
- socket.getReceiver().resetEXPTimer();
if(packet.forSender()){
socket.getSender().receive(lastPacket);
}else{
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -1,12 +1,16 @@
package udt;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import udt.util.UDTStatistics;
import udt.util.Util;
public class UDTCongestionControl implements CongestionControl {
+ private static final Logger logger=Logger.getLogger(UDTCongestionControl.class.getName());
+
private final UDTSession session;
private final UDTStatistics statistics;
@@ -49,14 +53,16 @@
long nACKCount=1;
//number of decreases in a congestion epoch
- long congestionEpochDecreaseCount=1;
+ long decCount=1;
//random threshold on decrease by number of loss events
- long decreaseRandom;
+ long decreaseRandom=1;
//average number of NAKs per congestion
long averageNACKNum;
+ boolean loss=false;
+
public UDTCongestionControl(UDTSession session){
this.session=session;
this.statistics=session.getStatistics();
@@ -115,41 +121,65 @@
* @see udt.CongestionControl#onACK(long)
*/
public void onACK(long ackSeqno){
- //the fixed size of a UDT packet
- long maxSegmentSize=UDPEndPoint.DATAGRAM_SIZE;
-
- //1.if it is in slow start phase,set the congestion window size
- //to the product of packet arrival rate and(rtt +SYN)
- double A=packetArrivalRate*(roundTripTime+Util.getSYNTime());
- //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A);
+ //increase window during slow start
if(slowStartPhase){
- congestionWindowSize=16;
- slowStartPhase=false;
- return;
+ congestionWindowSize+=ackSeqno-lastAckSeqNumber;
+ lastAckSeqNumber = ackSeqno;
+
+ //but not beyond a maximum size
+ if(congestionWindowSize>session.getFlowWindowSize()){
+ System.out.println("slow start ends on ACK");
+ slowStartPhase=false;
+ if(packetArrivalRate>0){
+ packetSendingPeriod=1000000.0/packetArrivalRate;
+ }
+ else{
+ packetSendingPeriod=(double)congestionWindowSize/(roundTripTime+Util.getSYNTimeD());
+ }
+ }
+
}else{
+ //1.if it is not in slow start phase,set the congestion window size
+ //to the product of packet arrival rate and(rtt +SYN)
+ double A=packetArrivalRate/1000000.0*(roundTripTime+Util.getSYNTimeD());
congestionWindowSize=(long)A+16;
+ if(logger.isLoggable(Level.FINER)){
+ logger.finer("receive rate "+packetArrivalRate+" rtt "+roundTripTime+" set to window size: "+(A+16));
+ }
}
+ //no rate increase in slow start
+ if(slowStartPhase)return;
+
+ if(loss){
+ loss=false;
+ return;
+ }
+
//4.compute the number of sent packets to be increase in the next SYN period
//and update the send intervall
- if(estimatedLinkCapacity<= packetArrivalRate){
- numOfIncreasingPacket= 1.0/maxSegmentSize;
- }else{
- numOfIncreasingPacket=computeNumOfIncreasingPacket();
- }
+ numOfIncreasingPacket=computeNumOfIncreasingPacket();
+
//4.update the send period :
double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD());
- packetSendingPeriod=packetSendingPeriod*factor;
+ packetSendingPeriod=factor*packetSendingPeriod;
+ //packetSendingPeriod=0.995*packetSendingPeriod;
+ //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod);
+
statistics.setSendPeriod(packetSendingPeriod);
}
+ private final long PS=UDPEndPoint.DATAGRAM_SIZE;
+ private final double BetaDivPS=0.0000015/PS;
+
//see spec page 16
- final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE;
private double computeNumOfIncreasingPacket (){
- long B=estimatedLinkCapacity;
- double C=1.0/packetSendingPeriod;
- if(B<=C)return C;
- long PS=UDPEndPoint.DATAGRAM_SIZE;
+ //link capacity and sending speed, in packets per second
+ double B=estimatedLinkCapacity;
+ double C=1000000.0/packetSendingPeriod;
+
+ if(B<=C)return 1.0/UDPEndPoint.DATAGRAM_SIZE;
+
double exp=Math.ceil(Math.log10((B-C)*PS*8));
double power10 = Math.pow( 10.0, exp)* BetaDivPS;
double inc = Math.max(power10, 1/PS);
@@ -160,49 +190,49 @@
* @see udt.CongestionControl#onNAK(java.util.List)
*/
public void onNAK(List<Integer>lossInfo){
+ loss=true;
long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1);
- long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber();
- lastAckSeqNumber = currentMaxSequenceNumber;
+
nACKCount++;
/*1) If it is in slow start phase, set inter-packet interval to
1/recvrate. Slow start ends. Stop. */
if(slowStartPhase){
if(packetArrivalRate>0){
- packetSendingPeriod = 1e6/packetArrivalRate;
+ packetSendingPeriod = 100000.0/packetArrivalRate;
}
else{
- packetSendingPeriod=congestionWindowSize*(roundTripTime+Util.getSYNTime());
+ packetSendingPeriod=congestionWindowSize/(roundTripTime+Util.getSYNTime());
}
slowStartPhase = false;
return;
}
-
- //start new congestion epoch
+ long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber();
+
+ // 2)If this NAK starts a new congestion epoch
if(firstBiggestlossSeqNo>lastDecreaseSeqNo){
- // 2)If this NAK starts a new congestion epoch
+
// -increase inter-packet interval
packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125);
// -Update AvgNAKNum(the average number of NAKs per congestion)
averageNACKNum = (int)Math.ceil(averageNACKNum*0.875 + nACKCount*0.125);
- // -reset NAKCount to 1,
+ // -reset NAKCount and DecCount to 1,
nACKCount=1;
- /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum.. */
+ decCount=1;
+ /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum */
decreaseRandom =(int)Math.ceil((averageNACKNum-1)*Math.random()+1);
// -Update LastDecSeq
lastDecreaseSeqNo = currentMaxSequenceNumber;
// -Stop.
statistics.setSendPeriod(packetSendingPeriod);
- return;
}
//* 3) If DecCount <= 5, and NAKCount == DecCount * DecRandom:
- if(congestionEpochDecreaseCount<=5 &&
- nACKCount==congestionEpochDecreaseCount*decreaseRandom){
+ if(decCount<=5 && nACKCount==decCount*decreaseRandom){
// a. Update SND period: SND = SND * 1.125;
packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125);
// b. Increase DecCount by 1;
- congestionEpochDecreaseCount++;
+ decCount++;
// c. Record the current largest sent sequence number (LastDecSeq).
lastDecreaseSeqNo= currentMaxSequenceNumber;
statistics.setSendPeriod(packetSendingPeriod);
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -110,7 +110,7 @@
long packetArrivalSpeed;
//round trip time, calculated from ACK/ACK2 pairs
- long roundTripTime=10*Util.getSYNTime();
+ long roundTripTime=50*1000;
//round trip time variance
long roundTripTimeVar=roundTripTime/2;
@@ -125,8 +125,13 @@
private long nextEXP;
//microseconds to next EXP event
- private long EXP_INTERVAL=10*Util.getSYNTime();
+ private long EXP_INTERVAL=Util.getSYNTime();
+ //instant when the session was created (for expiry checking)
+ private final long sessionUpSince;
+ //milliseconds to timeout a new session that stays idle
+ private final long IDLE_TIMEOUT = 3*60*1000;
+
//buffer size for storing data
private final long bufferSize;
@@ -150,6 +155,7 @@
public UDTReceiver(UDTSession session,UDPEndPoint endpoint){
this.endpoint = endpoint;
this.session=session;
+ this.sessionUpSince=System.currentTimeMillis();
this.statistics=session.getStatistics();
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
ackHistoryWindow = new AckHistoryWindow(16);
@@ -295,7 +301,7 @@
UDTSender sender=session.getSocket().getSender();
//put all the unacknowledged packets in the senders loss list
sender.putUnacknowledgedPacketsIntoLossList();
- if(expCount>16){
+ if(expCount>16 && System.currentTimeMillis()-sessionUpSince > IDLE_TIMEOUT){
if(!connectionExpiryDisabled &&!stopped){
sendShutdown();
stop();
@@ -508,6 +514,10 @@
expCount=0;
}
+ protected void resetEXPCount(){
+ expCount=0;
+ }
+
protected void onShutdown()throws IOException{
stop();
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -96,9 +96,6 @@
//last acknowledge number, initialised to the initial sequence number
private long lastAckSequenceNumber;
- //instant when the last packet was sent
- private long lastSentTime=0;
-
//size of the send queue
public final int sendQueueLength;
@@ -199,6 +196,9 @@
NegativeAcknowledgement nak=(NegativeAcknowledgement)p;
onNAKPacketReceived(nak);
}
+ else if (p instanceof KeepAlive) {
+ session.getSocket().getReceiver().resetEXPCount();
+ }
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
@@ -248,14 +248,16 @@
for(Integer i: nak.getDecodedLossInfo()){
senderLossList.insert(new SenderLossListEntry(i));
}
- if(logger.isLoggable(Level.FINER)){
- logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo());
- }
session.getCongestionControl().onNAK(nak.getDecodedLossInfo());
- //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET?
session.getSocket().getReceiver().resetEXPTimer();
statistics.incNumberOfNAKReceived();
statistics.storeParameters();
+
+ //if(logger.isLoggable(Level.FINER)){
+ System.out.println("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
+ +"set send period to "+session.getCongestionControl().getSendInterval());
+ //}
+
return;
}
@@ -279,16 +281,19 @@
* sender algorithm
*/
public void senderAlgorithm()throws InterruptedException, IOException{
+ long iterationStart=Util.getCurrentTime();
//if the sender's loss list is not empty
SenderLossListEntry entry=senderLossList.getFirstEntry();
if (entry!=null) {
long seqNumber = entry.getSequenceNumber();
+
+ //TODO
//if the current seqNumber is 16n,check the timeOut in the
//loss list and send a message drop request.
- if((seqNumber%16)==0){
- //TODO
+ //if((seqNumber%16)==0){
//sendLossList.checkTimeOut(timeToLive);
- }
+ //}
+
try {
//retransmit the packet with the first entry in the list
//as sequence number and remove it from the list
@@ -307,13 +312,13 @@
//if the number of unacknowledged data packets does not exceed the congestion
//and the flow window sizes, pack a new packet
int unAcknowledged=unacknowledged.get();
- double snd=session.getCongestionControl().getSendInterval();
+
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
&& unAcknowledged<session.getFlowWindowSize()){
- if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){
- statistics.incNumberOfCCSlowDownEvents();
- return;
- }
+// if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){
+// statistics.incNumberOfCCSlowDownEvents();
+// return;
+// }
if(sendQueue.size()==0){
Thread.yield();
return;
@@ -321,7 +326,6 @@
DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS);
if(dp!=null){
send(dp);
- lastSentTime=Util.getCurrentTime();
largestSentSequenceNumber=dp.getPacketSequenceNumber();
}
}else{
@@ -332,6 +336,17 @@
Thread.sleep(1);
//waitForAck();
}
+
+ //wait
+ double snd=session.getCongestionControl().getSendInterval();
+ long passed=Util.getCurrentTime()-iterationStart;
+ while(snd-passed>0){
+ //busy wait, but we cannot wait with microsecond precision
+ if(snd-passed>750)Thread.sleep(1);
+ statistics.incNumberOfCCSlowDownEvents();
+ passed=Util.getCurrentTime()-iterationStart;
+ }
+
}
/**
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -35,7 +35,8 @@
import udt.util.CircularArray;
/**
- * a circular array that records time intervals between two data packets
+ * a circular array that records time intervals between two probing data packets.
+ * It is used to determine the estimated link capacity.
* @see {@link CircularArray}
*
*/
@@ -63,6 +64,7 @@
median+=circularArray.get(i).doubleValue();
}
median=median/num;
+
//median filtering
double upper=median*8;
double lower=median/8;
@@ -76,9 +78,9 @@
count++;
}
}
- median=total/count;
- //System.out.println("median "+median);
- return median;
+ double res=total/count;
+ //System.out.println("median: "+median+" filtered "+res);
+ return res;
}
/**
Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -18,6 +18,8 @@
}while(!serverStarted);
File f=new File("src/test/java/datafile");
+ f=new File("/tmp/200MB");
+
File tmp=File.createTempFile("udtest-", null);
String[] args=new String[]{"localhost","65321",f.getAbsolutePath(),tmp.getAbsolutePath()};
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -20,7 +20,7 @@
boolean running=false;
//how many
- int num_packets=300;
+ int num_packets=200;
//how large is a single packet
int size=1*1024*1024;
@@ -34,7 +34,7 @@
//System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- UDTReceiver.connectionExpiryDisabled=true;
+ //UDTReceiver.connectionExpiryDisabled=true;
doTest();
}
@@ -109,7 +109,7 @@
c=is.read(buf);
if(c<0)break;
else{
- //md5.update(buf, 0, c);
+ md5.update(buf, 0, c);
total+=c;
Thread.yield();
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|