udt-java-commits Mailing List for UDT-Java (Page 3)
Status: Alpha
Brought to you by:
bschuller
You can subscribe to this list here.
| 2010 |
Jan
|
Feb
|
Mar
|
Apr
(17) |
May
(4) |
Jun
(1) |
Jul
|
Aug
(3) |
Sep
(7) |
Oct
|
Nov
(1) |
Dec
|
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2011 |
Jan
(1) |
Feb
(1) |
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(10) |
Sep
|
Oct
(1) |
Nov
(1) |
Dec
(2) |
| 2012 |
Jan
|
Feb
(4) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
|
From: <bsc...@us...> - 2010-04-21 20:36:43
|
Revision: 22
http://udt-java.svn.sourceforge.net/udt-java/?rev=22&view=rev
Author: bschuller
Date: 2010-04-21 20:36:36 +0000 (Wed, 21 Apr 2010)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 15:45:00 UTC (rev 21)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 20:36:36 UTC (rev 22)
@@ -7,62 +7,61 @@
import udt.util.UDTStatistics;
import udt.util.Util;
+/**
+ * default UDT congestion control.<br/>
+ *
+ * The algorithm is adapted from the C++ reference implementation.
+ */
public class UDTCongestionControl implements CongestionControl {
private static final Logger logger=Logger.getLogger(UDTCongestionControl.class.getName());
private final UDTSession session;
-
+
private final UDTStatistics statistics;
-
+
//round trip time in microseconds
private long roundTripTime=2*Util.getSYNTime();
-
+
//rate in packets per second
private long packetArrivalRate=0;
-
+
//link capacity in packets per second
private long estimatedLinkCapacity=0;
-
- long maxControlWindowSize=128;
// Packet sending period = packet send interval, in microseconds
private double packetSendingPeriod=1;
+
// Congestion window size, in packets
private long congestionWindowSize=16;
-
- //number of packets to be increased in the next SYN period
- private double numOfIncreasingPacket;
-
+
//last rate increase time (microsecond value)
long lastRateIncreaseTime=Util.getCurrentTime();
-
+
/*if in slow start phase*/
boolean slowStartPhase=true;
-
+
/*last ACKed seq no*/
long lastAckSeqNumber=-1;
-
+
/*max packet seq. no. sent out when last decrease happened*/
private long lastDecreaseSeqNo;
- //value of packetSendPeriod when last decrease happened
- long lastDecreasePeriod;
-
//NAK counter
- long nACKCount=1;
-
+ private long nACKCount=1;
+
//number of decreases in a congestion epoch
long decCount=1;
-
+
//random threshold on decrease by number of loss events
long decreaseRandom=1;
-
+
//average number of NAKs per congestion
long averageNACKNum;
- boolean loss=false;
-
+ //this flag avoids immediate rate increase after a NAK
+ private boolean loss=false;
+
public UDTCongestionControl(UDTSession session){
this.session=session;
this.statistics=session.getStatistics();
@@ -108,7 +107,7 @@
public double getSendInterval(){
return packetSendingPeriod;
}
-
+
/**
* congestionWindowSize
* @return
@@ -125,7 +124,7 @@
if(slowStartPhase){
congestionWindowSize+=ackSeqno-lastAckSeqNumber;
lastAckSeqNumber = ackSeqno;
-
+
//but not beyond a maximum size
if(congestionWindowSize>session.getFlowWindowSize()){
System.out.println("slow start ends on ACK");
@@ -137,7 +136,7 @@
packetSendingPeriod=(double)congestionWindowSize/(roundTripTime+Util.getSYNTimeD());
}
}
-
+
}else{
//1.if it is not in slow start phase,set the congestion window size
//to the product of packet arrival rate and(rtt +SYN)
@@ -148,51 +147,51 @@
}
}
- //no rate increase in slow start
+ //no rate increase during slow start
if(slowStartPhase)return;
-
+
+ //no rate increase "immediately" after a NAK
if(loss){
loss=false;
return;
}
-
- //4.compute the number of sent packets to be increase in the next SYN period
- //and update the send intervall
- numOfIncreasingPacket=computeNumOfIncreasingPacket();
-
- //4.update the send period :
+
+ //4. compute the increase in sent packets for the next SYN period
+ double numOfIncreasingPacket=computeNumOfIncreasingPacket();
+
+ //5. update the send period
double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD());
packetSendingPeriod=factor*packetSendingPeriod;
//packetSendingPeriod=0.995*packetSendingPeriod;
//System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod);
-
+
statistics.setSendPeriod(packetSendingPeriod);
}
private final long PS=UDPEndPoint.DATAGRAM_SIZE;
private final double BetaDivPS=0.0000015/PS;
-
+
//see spec page 16
private double computeNumOfIncreasingPacket (){
- //link capacity and sending speed, in packets per second
- double B=estimatedLinkCapacity;
- double C=1000000.0/packetSendingPeriod;
-
- if(B<=C)return 1.0/UDPEndPoint.DATAGRAM_SIZE;
-
- double exp=Math.ceil(Math.log10((B-C)*PS*8));
- double power10 = Math.pow( 10.0, exp)* BetaDivPS;
- double inc = Math.max(power10, 1/PS);
- return inc;
+ //difference in link capacity and sending speed, in packets per second
+ double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod;
+
+ if(remaining<=0){
+ return 1.0/UDPEndPoint.DATAGRAM_SIZE;
+ }
+ else{
+ double exp=Math.ceil(Math.log10(remaining*PS*8));
+ double power10 = Math.pow( 10.0, exp)* BetaDivPS;
+ return Math.max(power10, 1/PS);
+ }
}
-
+
/* (non-Javadoc)
* @see udt.CongestionControl#onNAK(java.util.List)
*/
public void onNAK(List<Integer>lossInfo){
loss=true;
- long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1);
-
+ long firstBiggestlossSeqNo=lossInfo.get(0);
nACKCount++;
/*1) If it is in slow start phase, set inter-packet interval to
1/recvrate. Slow start ends. Stop. */
@@ -206,12 +205,10 @@
slowStartPhase = false;
return;
}
-
+
long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber();
-
// 2)If this NAK starts a new congestion epoch
if(firstBiggestlossSeqNo>lastDecreaseSeqNo){
-
// -increase inter-packet interval
packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125);
// -Update AvgNAKNum(the average number of NAKs per congestion)
@@ -224,20 +221,19 @@
// -Update LastDecSeq
lastDecreaseSeqNo = currentMaxSequenceNumber;
// -Stop.
- statistics.setSendPeriod(packetSendingPeriod);
}
-
//* 3) If DecCount <= 5, and NAKCount == DecCount * DecRandom:
- if(decCount<=5 && nACKCount==decCount*decreaseRandom){
+ else if(decCount<=5 && nACKCount==decCount*decreaseRandom){
// a. Update SND period: SND = SND * 1.125;
packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125);
// b. Increase DecCount by 1;
decCount++;
// c. Record the current largest sent sequence number (LastDecSeq).
lastDecreaseSeqNo= currentMaxSequenceNumber;
- statistics.setSendPeriod(packetSendingPeriod);
- return;
}
+
+ statistics.setSendPeriod(packetSendingPeriod);
+ return;
}
/* (non-Javadoc)
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 15:45:00 UTC (rev 21)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 20:36:36 UTC (rev 22)
@@ -253,10 +253,10 @@
statistics.incNumberOfNAKReceived();
statistics.storeParameters();
- //if(logger.isLoggable(Level.FINER)){
- System.out.println("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
+ if(logger.isLoggable(Level.FINER)){
+ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
+"set send period to "+session.getCongestionControl().getSendInterval());
- //}
+ }
return;
}
@@ -315,12 +315,9 @@
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
&& unAcknowledged<session.getFlowWindowSize()){
-// if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){
-// statistics.incNumberOfCCSlowDownEvents();
-// return;
-// }
+
if(sendQueue.size()==0){
- Thread.yield();
+ //Thread.yield();
return;
}
DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS);
@@ -340,10 +337,12 @@
//wait
double snd=session.getCongestionControl().getSendInterval();
long passed=Util.getCurrentTime()-iterationStart;
+ int x=0;
while(snd-passed>0){
- //busy wait, but we cannot wait with microsecond precision
+ if(x++==0)statistics.incNumberOfCCSlowDownEvents();
+ //we cannot wait with microsecond precision
if(snd-passed>750)Thread.sleep(1);
- statistics.incNumberOfCCSlowDownEvents();
+ else Thread.yield();
passed=Util.getCurrentTime()-iterationStart;
}
@@ -394,8 +393,9 @@
return largestSentSequenceNumber>=sequenceNumber;
}
+
boolean haveLostPackets(){
- return senderLossList.isEmpty();
+ return !senderLossList.isEmpty();
}
/**
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 15:45:00 UTC (rev 21)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 20:36:36 UTC (rev 22)
@@ -79,7 +79,6 @@
}
}
double res=total/count;
- //System.out.println("median: "+median+" filtered "+res);
return res;
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 15:45:00 UTC (rev 21)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 20:36:36 UTC (rev 22)
@@ -31,7 +31,6 @@
*********************************************************************************/
package udt.sender;
-import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;
/**
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 15:45:00 UTC (rev 21)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 20:36:36 UTC (rev 22)
@@ -20,7 +20,7 @@
boolean running=false;
//how many
- int num_packets=200;
+ int num_packets=100;
//how large is a single packet
int size=1*1024*1024;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-04-21 15:45:06
|
Revision: 21
http://udt-java.svn.sourceforge.net/udt-java/?rev=21&view=rev
Author: bschuller
Date: 2010-04-21 15:45:00 +0000 (Wed, 21 Apr 2010)
Log Message:
-----------
rate control stuff --- still not so great IMO
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/ClientSession.java
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Modified: udt-java/trunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -107,8 +107,6 @@
}
active = true;
try{
- //packet received means we should not yet expire
- socket.getReceiver().resetEXPTimer();
if(packet.forSender()){
socket.getSender().receive(lastPacket);
}else{
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -1,12 +1,16 @@
package udt;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import udt.util.UDTStatistics;
import udt.util.Util;
public class UDTCongestionControl implements CongestionControl {
+ private static final Logger logger=Logger.getLogger(UDTCongestionControl.class.getName());
+
private final UDTSession session;
private final UDTStatistics statistics;
@@ -49,14 +53,16 @@
long nACKCount=1;
//number of decreases in a congestion epoch
- long congestionEpochDecreaseCount=1;
+ long decCount=1;
//random threshold on decrease by number of loss events
- long decreaseRandom;
+ long decreaseRandom=1;
//average number of NAKs per congestion
long averageNACKNum;
+ boolean loss=false;
+
public UDTCongestionControl(UDTSession session){
this.session=session;
this.statistics=session.getStatistics();
@@ -115,41 +121,65 @@
* @see udt.CongestionControl#onACK(long)
*/
public void onACK(long ackSeqno){
- //the fixed size of a UDT packet
- long maxSegmentSize=UDPEndPoint.DATAGRAM_SIZE;
-
- //1.if it is in slow start phase,set the congestion window size
- //to the product of packet arrival rate and(rtt +SYN)
- double A=packetArrivalRate*(roundTripTime+Util.getSYNTime());
- //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A);
+ //increase window during slow start
if(slowStartPhase){
- congestionWindowSize=16;
- slowStartPhase=false;
- return;
+ congestionWindowSize+=ackSeqno-lastAckSeqNumber;
+ lastAckSeqNumber = ackSeqno;
+
+ //but not beyond a maximum size
+ if(congestionWindowSize>session.getFlowWindowSize()){
+ System.out.println("slow start ends on ACK");
+ slowStartPhase=false;
+ if(packetArrivalRate>0){
+ packetSendingPeriod=1000000.0/packetArrivalRate;
+ }
+ else{
+ packetSendingPeriod=(double)congestionWindowSize/(roundTripTime+Util.getSYNTimeD());
+ }
+ }
+
}else{
+ //1.if it is not in slow start phase,set the congestion window size
+ //to the product of packet arrival rate and(rtt +SYN)
+ double A=packetArrivalRate/1000000.0*(roundTripTime+Util.getSYNTimeD());
congestionWindowSize=(long)A+16;
+ if(logger.isLoggable(Level.FINER)){
+ logger.finer("receive rate "+packetArrivalRate+" rtt "+roundTripTime+" set to window size: "+(A+16));
+ }
}
+ //no rate increase in slow start
+ if(slowStartPhase)return;
+
+ if(loss){
+ loss=false;
+ return;
+ }
+
//4.compute the number of sent packets to be increase in the next SYN period
//and update the send intervall
- if(estimatedLinkCapacity<= packetArrivalRate){
- numOfIncreasingPacket= 1.0/maxSegmentSize;
- }else{
- numOfIncreasingPacket=computeNumOfIncreasingPacket();
- }
+ numOfIncreasingPacket=computeNumOfIncreasingPacket();
+
//4.update the send period :
double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD());
- packetSendingPeriod=packetSendingPeriod*factor;
+ packetSendingPeriod=factor*packetSendingPeriod;
+ //packetSendingPeriod=0.995*packetSendingPeriod;
+ //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod);
+
statistics.setSendPeriod(packetSendingPeriod);
}
+ private final long PS=UDPEndPoint.DATAGRAM_SIZE;
+ private final double BetaDivPS=0.0000015/PS;
+
//see spec page 16
- final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE;
private double computeNumOfIncreasingPacket (){
- long B=estimatedLinkCapacity;
- double C=1.0/packetSendingPeriod;
- if(B<=C)return C;
- long PS=UDPEndPoint.DATAGRAM_SIZE;
+ //link capacity and sending speed, in packets per second
+ double B=estimatedLinkCapacity;
+ double C=1000000.0/packetSendingPeriod;
+
+ if(B<=C)return 1.0/UDPEndPoint.DATAGRAM_SIZE;
+
double exp=Math.ceil(Math.log10((B-C)*PS*8));
double power10 = Math.pow( 10.0, exp)* BetaDivPS;
double inc = Math.max(power10, 1/PS);
@@ -160,49 +190,49 @@
* @see udt.CongestionControl#onNAK(java.util.List)
*/
public void onNAK(List<Integer>lossInfo){
+ loss=true;
long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1);
- long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber();
- lastAckSeqNumber = currentMaxSequenceNumber;
+
nACKCount++;
/*1) If it is in slow start phase, set inter-packet interval to
1/recvrate. Slow start ends. Stop. */
if(slowStartPhase){
if(packetArrivalRate>0){
- packetSendingPeriod = 1e6/packetArrivalRate;
+ packetSendingPeriod = 100000.0/packetArrivalRate;
}
else{
- packetSendingPeriod=congestionWindowSize*(roundTripTime+Util.getSYNTime());
+ packetSendingPeriod=congestionWindowSize/(roundTripTime+Util.getSYNTime());
}
slowStartPhase = false;
return;
}
-
- //start new congestion epoch
+ long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber();
+
+ // 2)If this NAK starts a new congestion epoch
if(firstBiggestlossSeqNo>lastDecreaseSeqNo){
- // 2)If this NAK starts a new congestion epoch
+
// -increase inter-packet interval
packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125);
// -Update AvgNAKNum(the average number of NAKs per congestion)
averageNACKNum = (int)Math.ceil(averageNACKNum*0.875 + nACKCount*0.125);
- // -reset NAKCount to 1,
+ // -reset NAKCount and DecCount to 1,
nACKCount=1;
- /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum.. */
+ decCount=1;
+ /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum */
decreaseRandom =(int)Math.ceil((averageNACKNum-1)*Math.random()+1);
// -Update LastDecSeq
lastDecreaseSeqNo = currentMaxSequenceNumber;
// -Stop.
statistics.setSendPeriod(packetSendingPeriod);
- return;
}
//* 3) If DecCount <= 5, and NAKCount == DecCount * DecRandom:
- if(congestionEpochDecreaseCount<=5 &&
- nACKCount==congestionEpochDecreaseCount*decreaseRandom){
+ if(decCount<=5 && nACKCount==decCount*decreaseRandom){
// a. Update SND period: SND = SND * 1.125;
packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125);
// b. Increase DecCount by 1;
- congestionEpochDecreaseCount++;
+ decCount++;
// c. Record the current largest sent sequence number (LastDecSeq).
lastDecreaseSeqNo= currentMaxSequenceNumber;
statistics.setSendPeriod(packetSendingPeriod);
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -110,7 +110,7 @@
long packetArrivalSpeed;
//round trip time, calculated from ACK/ACK2 pairs
- long roundTripTime=10*Util.getSYNTime();
+ long roundTripTime=50*1000;
//round trip time variance
long roundTripTimeVar=roundTripTime/2;
@@ -125,8 +125,13 @@
private long nextEXP;
//microseconds to next EXP event
- private long EXP_INTERVAL=10*Util.getSYNTime();
+ private long EXP_INTERVAL=Util.getSYNTime();
+ //instant when the session was created (for expiry checking)
+ private final long sessionUpSince;
+ //milliseconds to timeout a new session that stays idle
+ private final long IDLE_TIMEOUT = 3*60*1000;
+
//buffer size for storing data
private final long bufferSize;
@@ -150,6 +155,7 @@
public UDTReceiver(UDTSession session,UDPEndPoint endpoint){
this.endpoint = endpoint;
this.session=session;
+ this.sessionUpSince=System.currentTimeMillis();
this.statistics=session.getStatistics();
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
ackHistoryWindow = new AckHistoryWindow(16);
@@ -295,7 +301,7 @@
UDTSender sender=session.getSocket().getSender();
//put all the unacknowledged packets in the senders loss list
sender.putUnacknowledgedPacketsIntoLossList();
- if(expCount>16){
+ if(expCount>16 && System.currentTimeMillis()-sessionUpSince > IDLE_TIMEOUT){
if(!connectionExpiryDisabled &&!stopped){
sendShutdown();
stop();
@@ -508,6 +514,10 @@
expCount=0;
}
+ protected void resetEXPCount(){
+ expCount=0;
+ }
+
protected void onShutdown()throws IOException{
stop();
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -96,9 +96,6 @@
//last acknowledge number, initialised to the initial sequence number
private long lastAckSequenceNumber;
- //instant when the last packet was sent
- private long lastSentTime=0;
-
//size of the send queue
public final int sendQueueLength;
@@ -199,6 +196,9 @@
NegativeAcknowledgement nak=(NegativeAcknowledgement)p;
onNAKPacketReceived(nak);
}
+ else if (p instanceof KeepAlive) {
+ session.getSocket().getReceiver().resetEXPCount();
+ }
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
@@ -248,14 +248,16 @@
for(Integer i: nak.getDecodedLossInfo()){
senderLossList.insert(new SenderLossListEntry(i));
}
- if(logger.isLoggable(Level.FINER)){
- logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo());
- }
session.getCongestionControl().onNAK(nak.getDecodedLossInfo());
- //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET?
session.getSocket().getReceiver().resetEXPTimer();
statistics.incNumberOfNAKReceived();
statistics.storeParameters();
+
+ //if(logger.isLoggable(Level.FINER)){
+ System.out.println("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
+ +"set send period to "+session.getCongestionControl().getSendInterval());
+ //}
+
return;
}
@@ -279,16 +281,19 @@
* sender algorithm
*/
public void senderAlgorithm()throws InterruptedException, IOException{
+ long iterationStart=Util.getCurrentTime();
//if the sender's loss list is not empty
SenderLossListEntry entry=senderLossList.getFirstEntry();
if (entry!=null) {
long seqNumber = entry.getSequenceNumber();
+
+ //TODO
//if the current seqNumber is 16n,check the timeOut in the
//loss list and send a message drop request.
- if((seqNumber%16)==0){
- //TODO
+ //if((seqNumber%16)==0){
//sendLossList.checkTimeOut(timeToLive);
- }
+ //}
+
try {
//retransmit the packet with the first entry in the list
//as sequence number and remove it from the list
@@ -307,13 +312,13 @@
//if the number of unacknowledged data packets does not exceed the congestion
//and the flow window sizes, pack a new packet
int unAcknowledged=unacknowledged.get();
- double snd=session.getCongestionControl().getSendInterval();
+
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
&& unAcknowledged<session.getFlowWindowSize()){
- if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){
- statistics.incNumberOfCCSlowDownEvents();
- return;
- }
+// if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){
+// statistics.incNumberOfCCSlowDownEvents();
+// return;
+// }
if(sendQueue.size()==0){
Thread.yield();
return;
@@ -321,7 +326,6 @@
DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS);
if(dp!=null){
send(dp);
- lastSentTime=Util.getCurrentTime();
largestSentSequenceNumber=dp.getPacketSequenceNumber();
}
}else{
@@ -332,6 +336,17 @@
Thread.sleep(1);
//waitForAck();
}
+
+ //wait
+ double snd=session.getCongestionControl().getSendInterval();
+ long passed=Util.getCurrentTime()-iterationStart;
+ while(snd-passed>0){
+ //busy wait, but we cannot wait with microsecond precision
+ if(snd-passed>750)Thread.sleep(1);
+ statistics.incNumberOfCCSlowDownEvents();
+ passed=Util.getCurrentTime()-iterationStart;
+ }
+
}
/**
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -35,7 +35,8 @@
import udt.util.CircularArray;
/**
- * a circular array that records time intervals between two data packets
+ * a circular array that records time intervals between two probing data packets.
+ * It is used to determine the estimated link capacity.
* @see {@link CircularArray}
*
*/
@@ -63,6 +64,7 @@
median+=circularArray.get(i).doubleValue();
}
median=median/num;
+
//median filtering
double upper=median*8;
double lower=median/8;
@@ -76,9 +78,9 @@
count++;
}
}
- median=total/count;
- //System.out.println("median "+median);
- return median;
+ double res=total/count;
+ //System.out.println("median: "+median+" filtered "+res);
+ return res;
}
/**
Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -18,6 +18,8 @@
}while(!serverStarted);
File f=new File("src/test/java/datafile");
+ f=new File("/tmp/200MB");
+
File tmp=File.createTempFile("udtest-", null);
String[] args=new String[]{"localhost","65321",f.getAbsolutePath(),tmp.getAbsolutePath()};
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 21:21:15 UTC (rev 20)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 15:45:00 UTC (rev 21)
@@ -20,7 +20,7 @@
boolean running=false;
//how many
- int num_packets=300;
+ int num_packets=200;
//how large is a single packet
int size=1*1024*1024;
@@ -34,7 +34,7 @@
//System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- UDTReceiver.connectionExpiryDisabled=true;
+ //UDTReceiver.connectionExpiryDisabled=true;
doTest();
}
@@ -109,7 +109,7 @@
c=is.read(buf);
if(c<0)break;
else{
- //md5.update(buf, 0, c);
+ md5.update(buf, 0, c);
total+=c;
Thread.yield();
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-04-20 21:21:21
|
Revision: 20
http://udt-java.svn.sourceforge.net/udt-java/?rev=20&view=rev
Author: bschuller
Date: 2010-04-20 21:21:15 +0000 (Tue, 20 Apr 2010)
Log Message:
-----------
disable connection expiry
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/SendFile.java
udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 20:17:07 UTC (rev 19)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 21:21:15 UTC (rev 20)
@@ -125,7 +125,7 @@
private long nextEXP;
//microseconds to next EXP event
- private long EXP_INTERVAL=2*Util.getSYNTime();
+ private long EXP_INTERVAL=10*Util.getSYNTime();
//buffer size for storing data
private final long bufferSize;
@@ -513,7 +513,6 @@
}
public void stop()throws IOException{
- System.out.println("STOP");
stopped=true;
session.getSocket().close();
//stop our sender as well
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-20 20:17:07 UTC (rev 19)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20)
@@ -40,6 +40,7 @@
import udt.UDTClient;
import udt.UDTInputStream;
import udt.UDTOutputStream;
+import udt.UDTReceiver;
/**
* helper class for receiving a single file via UDT
@@ -66,6 +67,7 @@
public void run(){
configure();
try{
+ UDTReceiver.connectionExpiryDisabled=true;
InetAddress myHost=localIP!=null?InetAddress.getByName(localIP):InetAddress.getLocalHost();
UDTClient client=localPort!=-1?new UDTClient(myHost,localPort):new UDTClient(myHost);
client.connect(serverHost, serverPort);
Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-20 20:17:07 UTC (rev 19)
+++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-20 21:21:15 UTC (rev 20)
@@ -42,6 +42,7 @@
import udt.UDTInputStream;
import udt.UDTOutputStream;
+import udt.UDTReceiver;
import udt.UDTServerSocket;
import udt.UDTSocket;
import udt.packets.PacketUtil;
@@ -72,6 +73,7 @@
public void run(){
configure();
try{
+ UDTReceiver.connectionExpiryDisabled=true;
InetAddress myHost=localIP!=null?InetAddress.getByName(localIP):InetAddress.getLocalHost();
UDTServerSocket server=new UDTServerSocket(myHost,serverPort);
while(true){
@@ -134,7 +136,7 @@
bb.get(fileName);
File file=new File(new String(fileName));
- System.out.println("[SendFile] File requested: "+file.getPath());
+ System.out.println("[SendFile] File requested: '"+file.getPath()+"'");
FileInputStream fis=new FileInputStream(file);
try{
Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 20:17:07 UTC (rev 19)
+++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20)
@@ -13,7 +13,10 @@
public void test1()throws Exception{
runServer();
- while(!serverStarted)Thread.sleep(100);
+ do{
+ Thread.sleep(500);
+ }while(!serverStarted);
+
File f=new File("src/test/java/datafile");
File tmp=File.createTempFile("udtest-", null);
@@ -28,9 +31,9 @@
private void runServer(){
Runnable r=new Runnable(){
public void run(){
- serverStarted=true;
String []args=new String[]{"65321"};
try{
+ serverStarted=true;
SendFile.main(args);
}catch(Exception ex){
ex.printStackTrace();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-04-20 20:17:14
|
Revision: 19
http://udt-java.svn.sourceforge.net/udt-java/?rev=19&view=rev
Author: bschuller
Date: 2010-04-20 20:17:07 +0000 (Tue, 20 Apr 2010)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/CongestionControl.java
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/UDTSocket.java
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/util/Util.java
udt-java/trunk/src/test/java/udt/NullCongestionControl.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
Modified: udt-java/trunk/src/main/java/udt/CongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -19,12 +19,18 @@
public abstract void setRTT(long rtt, long rttVar);
/**
- * set packet arrival rate and link capacity
+ * update packet arrival rate and link capacity with the
+ * values received in an ACK packet
* @param rate
* @param linkCapacity
*/
- public abstract void setPacketArrivalRate(long rate, long linkCapacity);
+ public abstract void updatePacketArrivalRate(long rate, long linkCapacity);
+ public long getPacketArrivalRate();
+
+ public long getEstimatedLinkCapacity();
+
+
/**
* Inter-packet interval in seconds
* @return
@@ -55,13 +61,13 @@
public abstract void onTimeout();
/**
- * Callback function to be called when a data is sent.
+ * Callback function to be called when a data packet is sent.
* @param packetSeqNo: the data sequence number.
*/
public abstract void onPacketSend(long packetSeqNo);
/**
- * Callback function to be called when a data is received.
+ * Callback function to be called when a data packet is received.
* @param packetSeqNo: the data sequence number.
*/
public abstract void onPacketReceive(long packetSeqNo);
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -235,6 +235,8 @@
* </ul>
* @throws IOException
*/
+ private long lastDestID=-1;
+ private UDTSession lastSession;
protected void doReceive()throws IOException{
try{
try{
@@ -265,7 +267,16 @@
else{
//dispatch to existing session
- UDTSession session=sessions.get(packet.getDestinationID());
+ long dest=packet.getDestinationID();
+ UDTSession session;
+ if(dest==lastDestID){
+ session=lastSession;
+ }
+ else{
+ session=sessions.get(dest);
+ lastSession=session;
+ lastDestID=dest;
+ }
if(session==null){
logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
}
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -8,15 +8,17 @@
public class UDTCongestionControl implements CongestionControl {
private final UDTSession session;
+
private final UDTStatistics statistics;
+
//round trip time in microseconds
- private long roundTripTime=10*Util.getSYNTime();
+ private long roundTripTime=2*Util.getSYNTime();
//rate in packets per second
- private long packetArrivalRate=100;
+ private long packetArrivalRate=0;
//link capacity in packets per second
- private long estimatedLinkCapacity;
+ private long estimatedLinkCapacity=0;
long maxControlWindowSize=128;
@@ -58,15 +60,14 @@
public UDTCongestionControl(UDTSession session){
this.session=session;
this.statistics=session.getStatistics();
- lastDecreaseSeqNo= session.getInitialSequenceNumber()-1;
+ lastDecreaseSeqNo=session.getInitialSequenceNumber()-1;
init();
}
/* (non-Javadoc)
* @see udt.CongestionControl#init()
*/
- public void init() {
-
+ public void init() {
}
/* (non-Javadoc)
@@ -79,16 +80,27 @@
/* (non-Javadoc)
* @see udt.CongestionControl#setPacketArrivalRate(long, long)
*/
- public void setPacketArrivalRate(long rate, long linkCapacity){
- this.packetArrivalRate=rate;
- this.estimatedLinkCapacity=linkCapacity;
+ public void updatePacketArrivalRate(long rate, long linkCapacity){
+ //see spec p. 14.
+ if(packetArrivalRate>0)packetArrivalRate=(packetArrivalRate*7+rate)/8;
+ else packetArrivalRate=rate;
+ if(estimatedLinkCapacity>0)estimatedLinkCapacity=(estimatedLinkCapacity*7+linkCapacity)/8;
+ else estimatedLinkCapacity=linkCapacity;
}
+ public long getPacketArrivalRate() {
+ return packetArrivalRate;
+ }
+
+ public long getEstimatedLinkCapacity() {
+ return estimatedLinkCapacity;
+ }
+
/* (non-Javadoc)
* @see udt.CongestionControl#getSendInterval()
*/
public double getSendInterval(){
- return packetSendingPeriod ;
+ return packetSendingPeriod;
}
/**
@@ -96,7 +108,7 @@
* @return
*/
public long getCongestionWindowSize(){
- return 2048;//congestionWindowSize;
+ return congestionWindowSize;
}
/* (non-Javadoc)
@@ -109,6 +121,7 @@
//1.if it is in slow start phase,set the congestion window size
//to the product of packet arrival rate and(rtt +SYN)
double A=packetArrivalRate*(roundTripTime+Util.getSYNTime());
+ //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A);
if(slowStartPhase){
congestionWindowSize=16;
slowStartPhase=false;
@@ -120,26 +133,26 @@
//4.compute the number of sent packets to be increase in the next SYN period
//and update the send intervall
if(estimatedLinkCapacity<= packetArrivalRate){
- numOfIncreasingPacket= 1/maxSegmentSize;
+ numOfIncreasingPacket= 1.0/maxSegmentSize;
}else{
numOfIncreasingPacket=computeNumOfIncreasingPacket();
}
//4.update the send period :
- packetSendingPeriod=packetSendingPeriod*Util.getSYNTimeSeconds()/
- (packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeSeconds());
+ double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD());
+ packetSendingPeriod=packetSendingPeriod*factor;
statistics.setSendPeriod(packetSendingPeriod);
}
- final double Beta=0.0000015/UDPEndPoint.DATAGRAM_SIZE;
+ //see spec page 16
+ final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE;
private double computeNumOfIncreasingPacket (){
- long B,C,S;
- B=estimatedLinkCapacity;
- C=packetArrivalRate;
- S=UDPEndPoint.DATAGRAM_SIZE;
-
- double logBase10=Math.log10( S*(B-C)*8 );
- double power10 = Math.pow( 10.0,Math.ceil (logBase10) )* Beta;
- double inc = Math.max(power10, 1/S);
+ long B=estimatedLinkCapacity;
+ double C=1.0/packetSendingPeriod;
+ if(B<=C)return C;
+ long PS=UDPEndPoint.DATAGRAM_SIZE;
+ double exp=Math.ceil(Math.log10((B-C)*PS*8));
+ double power10 = Math.pow( 10.0, exp)* BetaDivPS;
+ double inc = Math.max(power10, 1/PS);
return inc;
}
@@ -164,6 +177,7 @@
return;
}
+
//start new congestion epoch
if(firstBiggestlossSeqNo>lastDecreaseSeqNo){
// 2)If this NAK starts a new congestion epoch
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -81,8 +81,8 @@
private final PacketHistoryWindow packetHistoryWindow;
//for storing the arrival time of the last received data packet
- private long lastDataPacketArrivalTime=0;
-
+ private volatile long lastDataPacketArrivalTime=0;
+
//largest received data packet sequence number(LRSN)
private volatile long largestReceivedSeqNumber=0;
@@ -90,7 +90,7 @@
//last Ack number
private long lastAckNumber=0;
-
+
//largest Ack number ever acknowledged by ACK2
private volatile long largestAcknowledgedAckNumber=-1;
@@ -125,24 +125,24 @@
private long nextEXP;
//microseconds to next EXP event
- private long EXP_INTERVAL=1000000;
+ private long EXP_INTERVAL=2*Util.getSYNTime();
//buffer size for storing data
private final long bufferSize;
-
+
//stores packets to be sent
private final BlockingQueue<UDTPacket>handoffQueue=new ArrayBlockingQueue<UDTPacket>(32);
private Thread receiverThread;
private volatile boolean stopped=false;
-
+
/**
* if set to true connections will not expire, but will only be
* closed by a Shutdown message
*/
public static boolean connectionExpiryDisabled=false;
-
+
/**
* create a receiver with a valid {@link UDTSession}
* @param session
@@ -157,8 +157,8 @@
receiverLossList = new ReceiverLossList();
packetPairWindow = new PacketPairWindow(16);
nextACK=Util.getCurrentTime()+ACK_INTERVAL;
- nextNAK=Util.getCurrentTime()+NAK_INTERVAL;
- nextEXP=Util.getCurrentTime()+EXP_INTERVAL;
+ nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL);
+ nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL;
bufferSize=session.getReceiveBufferSize();
start();
}
@@ -194,7 +194,6 @@
* see specification P11.
*/
public void receiverAlgorithm()throws InterruptedException,IOException{
-
//check ACK timer
long currentTime=Util.getCurrentTime();
if(nextACK<currentTime){
@@ -254,7 +253,7 @@
return;
}else if (ackNumber==lastAckNumber) {
//or it is equals to the ackNumber in the last ACK
- //and the time interval between these two ACK packets ???
+ //and the time interval between these two ACK packets
//is less than 2 RTTs,do not send(stop)
long timeOfLastSentAck=ackHistoryWindow.getTime(lastAckNumber);
if(Util.getCurrentTime()-timeOfLastSentAck< 2*roundTripTime){
@@ -265,6 +264,7 @@
//if this ACK is not triggered by ACK timers,send out a light Ack and stop.
if(!isTriggeredByTimer){
ackSeqNumber=sendLightAcknowledgment(ackNumber);
+ return;
}
else{
//pack the packet speed and link capacity into the ACK packet and send it out.
@@ -320,11 +320,11 @@
Acknowledgment2 ack2=(Acknowledgment2)p;
onAck2PacketReceived(ack2);
}
-
+
else if (p instanceof Shutdown){
onShutdown();
}
-
+
//other packet types?
}
@@ -333,7 +333,7 @@
public static int dropRate=0;
//number of received data packets
private int n=0;
-
+
protected void onDataPacketReceived(DataPacket dp)throws IOException{
long currentSequenceNumber = dp.getPacketSequenceNumber();
//check whether to drop this packet
@@ -342,13 +342,13 @@
logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
return;
}
-
+
long currentDataPacketArrivalTime = Util.getCurrentTime();
/*(4).if the seqNo of the current data packet is 16n+1,record the
time interval between this packet and the last data packet
in the packet pair window*/
- if((currentSequenceNumber%16)==1){
+ if((currentSequenceNumber%16)==1 && lastDataPacketArrivalTime>0){
long interval=currentDataPacketArrivalTime -lastDataPacketArrivalTime;
packetPairWindow.add(interval);
}
@@ -362,7 +362,7 @@
//no left space in application data buffer->drop this packet
return;
}
-
+
//(6).number of detected lossed packet
/*(6.a).if the number of the current data packet is greater than LSRN+1,
put all the sequence numbers between (but excluding) these two values
@@ -378,9 +378,9 @@
receiverLossList.remove(currentSequenceNumber);
}
}
-
+
statistics.incNumberOfReceivedDataPackets();
-
+
//(7).Update the LRSN
if(currentSequenceNumber>largestReceivedSeqNumber){
largestReceivedSeqNumber=currentSequenceNumber;
@@ -410,7 +410,7 @@
protected void sendNAK(List<Long>sequenceNumbers)throws IOException{
if(sequenceNumbers.size()==0)return;
NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement();
-
+
nAckPacket.addLossInfo(sequenceNumbers);
nAckPacket.setSession(session);
nAckPacket.setDestinationID(session.getDestination().getSocketID());
@@ -428,14 +428,14 @@
protected long sendAcknowledgment(long ackNumber)throws IOException{
Acknowledgement acknowledgmentPkt = buildLightAcknowledgement(ackNumber);
//set the estimate link capacity
- estimateLinkCapacity=(long)packetPairWindow.getEstimatedLinkCapacity();
+ estimateLinkCapacity=packetPairWindow.getEstimatedLinkCapacity();
acknowledgmentPkt.setEstimatedLinkCapacity(estimateLinkCapacity);
//set the packet arrival rate
packetArrivalSpeed=(long)packetHistoryWindow.getPacketArrivalSpeed();
acknowledgmentPkt.setPacketReceiveRate(packetArrivalSpeed);
-
+
endpoint.doSend(acknowledgmentPkt);
-
+
statistics.incNumberOfACKSent();
statistics.setPacketArrivalRate(packetArrivalSpeed, estimateLinkCapacity);
return acknowledgmentPkt.getAckSequenceNumber();
@@ -452,10 +452,10 @@
acknowledgmentPkt.setRoundTripTimeVar(roundTripTimeVar);
//set the buffer size
acknowledgmentPkt.setBufferSize(bufferSize);
-
+
acknowledgmentPkt.setDestinationID(session.getDestination().getSocketID());
acknowledgmentPkt.setSession(session);
-
+
return acknowledgmentPkt;
}
@@ -469,21 +469,24 @@
rtt) / 8. <br/>
4) Update RTTVar by: RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4. <br/>
5) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN. <br/>
- */
+ */
protected void onAck2PacketReceived(Acknowledgment2 ack2){
AckHistoryEntry entry=ackHistoryWindow.getEntry(ack2.getAckSequenceNumber());
if(entry!=null){
long ackNumber=entry.getAckNumber();
largestAcknowledgedAckNumber=Math.max(ackNumber, largestAcknowledgedAckNumber);
+
long rtt=entry.getAge();
- roundTripTime = (roundTripTime*7 + rtt)/8;
+ if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8;
+ else roundTripTime = rtt;
roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4;
+
ACK_INTERVAL=4*roundTripTime+roundTripTimeVar+Util.getSYNTime();
NAK_INTERVAL=ACK_INTERVAL;
statistics.setRTT(roundTripTime, roundTripTimeVar);
}
}
-
+
protected void sendKeepAlive()throws IOException{
KeepAlive ka=new KeepAlive();
ka.setDestinationID(session.getDestination().getSocketID());
@@ -504,12 +507,13 @@
nextEXP=Util.getCurrentTime()+EXP_INTERVAL;
expCount=0;
}
-
+
protected void onShutdown()throws IOException{
stop();
}
public void stop()throws IOException{
+ System.out.println("STOP");
stopped=true;
session.getSocket().close();
//stop our sender as well
@@ -522,5 +526,5 @@
sb.append("LossList: "+receiverLossList);
return sb.toString();
}
-
+
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -68,9 +68,9 @@
private final UDPEndPoint endpoint;
private final UDTSession session;
-
+
private final UDTStatistics statistics;
-
+
//sendLossList store the sequence numbers of lost packets
//feed back by the receiver through NAK pakets
private final SenderLossList senderLossList;
@@ -96,25 +96,31 @@
//last acknowledge number, initialised to the initial sequence number
private long lastAckSequenceNumber;
+ //instant when the last packet was sent
+ private long lastSentTime=0;
+
//size of the send queue
- public static final int MAX_SIZE=1024;
+ public final int sendQueueLength;
private volatile boolean stopped=false;
- private volatile AtomicReference<CountDownLatch> latchRef=new AtomicReference<CountDownLatch>();
-
+ private volatile AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+
+ private volatile AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
+
public UDTSender(UDTSession session,UDPEndPoint endpoint){
+ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
this.endpoint= endpoint;
this.session=session;
- this.statistics=session.getStatistics();
- if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
+ statistics=session.getStatistics();
+ sendQueueLength=64;//session.getFlowWindowSize();
senderLossList=new SenderLossList();
- sendBuffer=new ConcurrentHashMap<Long, DataPacket>(MAX_SIZE,0.75f,2);
- sendQueue = new LinkedBlockingQueue<DataPacket>(MAX_SIZE);
+ sendBuffer=new ConcurrentHashMap<Long, DataPacket>(sendQueueLength,0.75f,2);
+ sendQueue = new LinkedBlockingQueue<DataPacket>(sendQueueLength);
lastAckSequenceNumber=session.getInitialSequenceNumber();
-
- latchRef.set(new CountDownLatch(1));
+ waitForAckLatch.set(new CountDownLatch(1));
+ waitForSeqAckLatch.set(new CountDownLatch(1));
start();
}
@@ -130,6 +136,7 @@
ie.printStackTrace();
}
catch(IOException ex){
+ ex.printStackTrace();
logger.log(Level.SEVERE,"",ex);
}
logger.info("STOPPING SENDER for "+session);
@@ -195,20 +202,25 @@
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
- latchRef.get().countDown();
+ waitForAckLatch.get().countDown();
+ waitForSeqAckLatch.get().countDown();
+
CongestionControl cc=session.getCongestionControl();
- if(acknowledgement.getPacketReceiveRate()>0){
- long rtt=acknowledgement.getRoundTripTime();
+ long rtt=acknowledgement.getRoundTripTime();
+ if(rtt>0){
long rttVar=acknowledgement.getRoundTripTimeVar();
cc.setRTT(rtt,rttVar);
- long rate=acknowledgement.getPacketReceiveRate();
- long linkCapacity=acknowledgement.getEstimatedLinkCapacity();
- cc.setPacketArrivalRate(rate, linkCapacity);
statistics.setRTT(rtt, rttVar);
- statistics.setPacketArrivalRate(rate, linkCapacity);
}
- cc.onACK(acknowledgement.getAckNumber());
+ long rate=acknowledgement.getPacketReceiveRate();
+ if(rate>0){
+ long linkCapacity=acknowledgement.getEstimatedLinkCapacity();
+ cc.updatePacketArrivalRate(rate, linkCapacity);
+ statistics.setPacketArrivalRate(cc.getPacketArrivalRate(), cc.getEstimatedLinkCapacity());
+ }
+
long ackNumber=acknowledgement.getAckNumber();
+ cc.onACK(ackNumber);
//need to remove all sequence numbers up the ack number from the sendBuffer
boolean removed=false;
for(long s=lastAckSequenceNumber;s<ackNumber;s++){
@@ -224,7 +236,6 @@
sendAck2(ackNumber);
statistics.incNumberOfACKReceived();
statistics.storeParameters();
-
}
/**
@@ -232,9 +243,14 @@
* @param nak
*/
protected void onNAKPacketReceived(NegativeAcknowledgement nak){
+ waitForAckLatch.get().countDown();
+
for(Integer i: nak.getDecodedLossInfo()){
senderLossList.insert(new SenderLossListEntry(i));
}
+ if(logger.isLoggable(Level.FINER)){
+ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo());
+ }
session.getCongestionControl().onNAK(nak.getDecodedLossInfo());
//reset EXP. EXP is in the receiver currently.... maybe move to SOCKET?
session.getSocket().getReceiver().resetEXPTimer();
@@ -253,7 +269,6 @@
protected void sendAck2(long ackSequenceNumber)throws IOException{
Acknowledgment2 ackOfAckPkt = new Acknowledgment2();
- ackOfAckPkt.setDestinationID(0L);
ackOfAckPkt.setAckSequenceNumber(ackSequenceNumber);
ackOfAckPkt.setSession(session);
ackOfAckPkt.setDestinationID(session.getDestination().getSocketID());
@@ -263,7 +278,6 @@
/**
* sender algorithm
*/
- long lastSentTime=0;
public void senderAlgorithm()throws InterruptedException, IOException{
//if the sender's loss list is not empty
SenderLossListEntry entry=senderLossList.getFirstEntry();
@@ -287,34 +301,37 @@
}catch (Exception e) {
logger.log(Level.WARNING,"",e);
}
+ // return;
}
- else {
- //if the number of unacknowledged data packets does not exceed the congestion
- //and the flow window sizes, pack a new packet
- int unAcknowledged=unacknowledged.get();
- if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
- && unAcknowledged<session.getFlowWindowSize()){
- double snd=session.getCongestionControl().getSendInterval();
- if(Util.getCurrentTime()-lastSentTime<snd){
- statistics.incNumberOfCCSlowDownEvents();
- return;
- }
- DataPacket dp=sendQueue.poll(100,TimeUnit.MILLISECONDS);
- if(dp!=null){
- lastSentTime=Util.getCurrentTime();
- send(dp);
- largestSentSequenceNumber=dp.getPacketSequenceNumber();
- }
- }else{
- //should we *really* wait for an ack?!
- if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){
- statistics.incNumberOfCCWindowExceededEvents();
- }
+ //if the number of unacknowledged data packets does not exceed the congestion
+ //and the flow window sizes, pack a new packet
+ int unAcknowledged=unacknowledged.get();
+ double snd=session.getCongestionControl().getSendInterval();
+ if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
+ && unAcknowledged<session.getFlowWindowSize()){
+ if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){
+ statistics.incNumberOfCCSlowDownEvents();
+ return;
}
- Thread.yield();
+ if(sendQueue.size()==0){
+ Thread.yield();
+ return;
+ }
+ DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS);
+ if(dp!=null){
+ send(dp);
+ lastSentTime=Util.getCurrentTime();
+ largestSentSequenceNumber=dp.getPacketSequenceNumber();
+ }
+ }else{
+ //should we *really* wait for an ack?!
+ if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){
+ statistics.incNumberOfCCWindowExceededEvents();
+ }
+ Thread.sleep(1);
+ //waitForAck();
}
-
}
/**
@@ -324,7 +341,6 @@
synchronized (sendLock) {
for(Long l: sendBuffer.keySet()){
senderLossList.insert(new SenderLossListEntry(l));
- logger.fine("NO ACK FOR "+l);
}
}
}
@@ -354,7 +370,7 @@
public long getLastAckSequenceNumber(){
return lastAckSequenceNumber;
}
-
+
boolean haveAcknowledgementFor(long sequenceNumber){
return sequenceNumber<=lastAckSequenceNumber;
}
@@ -366,19 +382,29 @@
boolean haveLostPackets(){
return senderLossList.isEmpty();
}
-
+
/**
- * wait for the next acknowledge
+ * wait until the given sequence number has been acknowledged
+ *
* @throws InterruptedException
*/
public synchronized void waitForAck(long sequenceNumber)throws InterruptedException{
while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
- latchRef.set(new CountDownLatch(1));
- latchRef.get().await(10, TimeUnit.MILLISECONDS);
+ waitForSeqAckLatch.set(new CountDownLatch(1));
+ waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS);
}
}
-
+ /**
+ * wait for the next acknowledge
+ * @throws InterruptedException
+ */
+ public synchronized void waitForAck()throws InterruptedException{
+ waitForAckLatch.set(new CountDownLatch(1));
+ waitForAckLatch.get().await(1000, TimeUnit.MILLISECONDS);
+ }
+
+
public void stop(){
stopped=true;
}
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -68,7 +68,7 @@
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=64;
+ protected int flowWindowSize=1024;
/**
* remote UDT entity (address and socket ID)
Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -170,6 +170,7 @@
int chunksize=session.getDatagramSize()-24;//need some bytes for the header
ByteBuffer bb=ByteBuffer.wrap(data,offset,length);
long seqNo=0;
+ int i=0;
while(bb.remaining()>0){
int len=Math.min(bb.remaining(),chunksize);
byte[]chunk=new byte[len];
@@ -182,8 +183,9 @@
packet.setData(chunk);
//put the packet into the send queue
while(!sender.sendUdtPacket(packet, timeout, units)){
- System.out.println("SOCKET WAIT");
+ Thread.sleep(1);
}
+ i++;
}
if(length>0)active=true;
}
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -64,7 +64,7 @@
}
void decode(byte[]encodedData,int length){
- packetSequenceNumber =PacketUtil.decode(encodedData, 0);
+ packetSequenceNumber=PacketUtil.decode(encodedData, 0);
messageNumber=PacketUtil.decode(encodedData, 4);
timeStamp=PacketUtil.decode(encodedData, 8);
destinationID=PacketUtil.decode(encodedData, 12);
Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -76,31 +76,31 @@
packet=new ConnectionHandshake(controlInformation);
}
//TYPE 0001:1
- if(ControlPacketType.KEEP_ALIVE.ordinal()==pktType){
+ else if(ControlPacketType.KEEP_ALIVE.ordinal()==pktType){
packet=new KeepAlive();
}
//TYPE 0010:2
- if(ControlPacketType.ACK.ordinal()==pktType){
+ else if(ControlPacketType.ACK.ordinal()==pktType){
packet=new Acknowledgement(controlInformation);
}
//TYPE 0011:3
- if(ControlPacketType.NAK.ordinal()==pktType){
+ else if(ControlPacketType.NAK.ordinal()==pktType){
packet=new NegativeAcknowledgement(controlInformation);
}
//TYPE 0101:5
- if(ControlPacketType.SHUTDOWN.ordinal()==pktType){
+ else if(ControlPacketType.SHUTDOWN.ordinal()==pktType){
packet=new Shutdown();
}
//TYPE 0110:6
- if(ControlPacketType.ACK2.ordinal()==pktType){
+ else if(ControlPacketType.ACK2.ordinal()==pktType){
packet=new Acknowledgment2(controlInformation);
}
//TYPE 0111:7
- if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){
+ else if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){
packet=new MessageDropRequest(controlInformation);
}
//TYPE 1111:8
- if(ControlPacketType.USER_DEFINED.ordinal()==pktType){
+ else if(ControlPacketType.USER_DEFINED.ordinal()==pktType){
packet=new UserDefined(controlInformation);
}
Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -112,71 +112,5 @@
return result;
}
-
- /**
- * Gibt einen int-Wert zurueck, der das Byte im Bereich von 0 bis
- * 255 repraesentiert (negative Bytes werden umgewandelt)
- *@param b zu konvertierendes Byte
- *@return positiver Byte-Wert
- */
- public static int castPositive(byte b)
- {
- return b < 0 ? b + 256 : b;
- }
-
- /**
- * Die Methode erstellt aus dem gegebenen byte-array eine Dezimalzahl vom
- * Typ long, indem jedes p als Zahl zur Basis 256 interpretiert wird
- * (gelesen: stelle 0 = b[b.length-1] ...)
- *@param b umzuwandelndes Byte-Array
- *@return entsprechende long-Zahl
- */
- public static long toDecValue(byte[] b) {
- if(b != null)
- {
- long result = 0;
- for (int i = 0; i < b.length; i++) {
- long l = castPositive(b[i]);
- result += l * Math.pow(256, b.length - i - 1);
- }
- return result;
- }
- return 0;
- }
-
-// private Hilfsmethoden
- /*Hilfsmethode zur Darstellung des 2-stelligen hexadezimalen Werts von b als String.
- *Negative byte-Werte werden ueber einen int-cast (und "Betrag+127") in positive Zahlen konvertiert.
- *(Anomalie: grosse byte-Werte werden im IEEE-2er-Komplement dargestellt?)
- */
- public static String toHexString(byte b)
- {
- String hex = Integer.toHexString(castPositive(b)).toUpperCase();
- if(hex.length() == 1) hex = "0" + hex;
- return hex;
- }
-
- public static long convertByteArrayToLong(byte[] buffer) {
- if (buffer.length != 4) {
- throw new IllegalArgumentException("buffer length must be 4 bytes!");
- }
- long
- value = buffer[0] << 24;
- value |= buffer[1] << 16;
- value |= buffer[2] << 8;
- value |= buffer[3];
- return value;
- }
-
- public static byte[] convertIntToByteArray(int val) {
- byte[] buffer = new byte[4];
-
- buffer[0] = (byte) (val >> 24);
- buffer[1] = (byte) (val >> 16);
- buffer[2] = (byte) (val >> 8);
- buffer[3] = (byte) val;
-
- return buffer;
- }
-
+
}
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -58,11 +58,27 @@
*/
public double computeMedianTimeInterval(){
int num=haveOverflow?max:Math.min(max, position);
- double total=0;
+ double median=0;
for(int i=0; i<num;i++){
- total+=circularArray.get(i).doubleValue();
+ median+=circularArray.get(i).doubleValue();
}
- return total/num;
+ median=median/num;
+ //median filtering
+ double upper=median*8;
+ double lower=median/8;
+ double total = 0;
+ double val=0;
+ int count=0;
+ for(int i=0; i<num;i++){
+ val=circularArray.get(i).doubleValue();
+ if(val<upper && val>lower){
+ total+=val;
+ count++;
+ }
+ }
+ median=total/count;
+ //System.out.println("median "+median);
+ return median;
}
/**
@@ -70,7 +86,8 @@
* packet pair window
* @return number of packets per second
*/
- public double getEstimatedLinkCapacity(){
- return 1e6/computeMedianTimeInterval();
+ public long getEstimatedLinkCapacity(){
+ long res=(long)Math.ceil(1000000/computeMedianTimeInterval());
+ return res;
}
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -55,15 +55,7 @@
}
public void remove(long seqNo){
- Iterator<SenderLossListEntry>iterator=backingList.iterator();
- while(iterator.hasNext()){
- SenderLossListEntry e=iterator.next();
- if(e.getSequenceNumber()==seqNo){
- iterator.remove();
- return;
- }
- }
- //backingList.remove(new SenderLossListEntry(seqNo));
+ backingList.remove(new SenderLossListEntry(seqNo));
}
/**
Modified: udt-java/trunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -47,7 +47,7 @@
public class Util {
/**
- * get the current system time in microseconds
+ * get the current timer value in microseconds
* @return
*/
public static long getCurrentTime(){
@@ -61,6 +61,11 @@
public static long getSYNTime(){
return 10000;
}
+
+ public static double getSYNTimeD(){
+ return 10000.0;
+ }
+
/**
* get the SYN time in seconds. The SYN time is 0.01 seconds = 10000 microseconds
* @return
Modified: udt-java/trunk/src/test/java/udt/NullCongestionControl.java
===================================================================
--- udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -39,10 +39,18 @@
public void onTimeout() {
}
- public void setPacketArrivalRate(long rate, long linkCapacity) {
+ public void updatePacketArrivalRate(long rate, long linkCapacity) {
}
public void setRTT(long rtt, long rttVar) {
}
+ public long getEstimatedLinkCapacity() {
+ return 0;
+ }
+
+ public long getPacketArrivalRate() {
+ return 0;
+ }
+
}
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -7,12 +7,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import udt.NullCongestionControl;
import udt.UDTClient;
import udt.UDTInputStream;
import udt.UDTReceiver;
import udt.UDTServerSocket;
-import udt.UDTSession;
import udt.UDTSocket;
import udt.UDTTestBase;
import udt.util.UDTStatistics;
@@ -22,7 +20,7 @@
boolean running=false;
//how many
- int num_packets=100;
+ int num_packets=300;
//how large is a single packet
int size=1*1024*1024;
@@ -33,8 +31,7 @@
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
- System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
-
+ //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
UDTReceiver.connectionExpiryDisabled=true;
@@ -81,7 +78,7 @@
System.out.println("MD5 hash of data received: "+md5_received);
System.out.println(client.getStatistics());
- assertEquals(md5_sent,md5_received);
+ //assertEquals(md5_sent,md5_received);
//store stat history to csv file
client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv"));
@@ -112,7 +109,7 @@
c=is.read(buf);
if(c<0)break;
else{
- md5.update(buf, 0, c);
+ //md5.update(buf, 0, c);
total+=c;
Thread.yield();
}
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 21:05:13 UTC (rev 18)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-20 20:17:07 UTC (rev 19)
@@ -6,15 +6,16 @@
import java.util.Random;
import junit.framework.TestCase;
+import udt.UDPEndPoint;
/**
* send some data over a UDP connection and measure performance
*/
public class UDPTest extends TestCase {
- final int num_packets=100*1000;
- final int packetSize=1500;
-
+ final int num_packets=5*1000;
+ final int packetSize=UDPEndPoint.DATAGRAM_SIZE;
+
public void test1()throws Exception{
runServer();
//client socket
@@ -42,11 +43,11 @@
System.out.println("Rate "+num_packets+" packets/sec");
System.out.println("Server received: "+total);
}
-
+
int N=0;
long total=0;
volatile boolean serverRunning=true;
-
+
private void runServer()throws Exception{
//server socket
final DatagramSocket serverSocket=new DatagramSocket(65321);
@@ -56,11 +57,15 @@
try{
byte[]buf=new byte[packetSize];
DatagramPacket dp=new DatagramPacket(buf,buf.length);
+ long start=System.currentTimeMillis();
while(true){
serverSocket.receive(dp);
total+=dp.getLength();
if(total==N)break;
}
+ long end=System.currentTimeMillis();
+ System.out.println("Server time: "+(end-start)+" ms.");
+
}
catch(Exception e){
e.printStackTrace();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-04-16 21:05:20
|
Revision: 18
http://udt-java.svn.sourceforge.net/udt-java/?rev=18&view=rev
Author: bschuller
Date: 2010-04-16 21:05:13 +0000 (Fri, 16 Apr 2010)
Log Message:
-----------
cc made configurable; tried to optimize loss lists
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/UDTSocket.java
udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java
udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java
udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
udt-java/trunk/src/main/java/udt/util/CircularArray.java
udt-java/trunk/src/main/java/udt/util/FlowWindow.java
udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
Added Paths:
-----------
udt-java/trunk/src/main/java/udt/CongestionControl.java
udt-java/trunk/src/test/java/udt/NullCongestionControl.java
Added: udt-java/trunk/src/main/java/udt/CongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/CongestionControl.java (rev 0)
+++ udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -0,0 +1,74 @@
+package udt;
+
+import java.util.List;
+
+/**
+ * congestion control interface
+ */
+public interface CongestionControl {
+
+ /**
+ * Callback function to be called (only) at the start of a UDT connection.
+ * when the UDT socket is conected
+ */
+ public abstract void init();
+
+ /**
+ * set roundtrip time and associated variance
+ */
+ public abstract void setRTT(long rtt, long rttVar);
+
+ /**
+ * set packet arrival rate and link capacity
+ * @param rate
+ * @param linkCapacity
+ */
+ public abstract void setPacketArrivalRate(long rate, long linkCapacity);
+
+ /**
+ * Inter-packet interval in seconds
+ * @return
+ */
+ public abstract double getSendInterval();
+
+ /**
+ * get the congestion window size
+ */
+ public abstract long getCongestionWindowSize();
+
+ /**
+ * Callback function to be called when an ACK packet is received.
+ * @param ackSeqno: the data sequence number acknowledged by this ACK.
+ * see spec. page(16-17)
+ */
+ public abstract void onACK(long ackSeqno);
+
+ /**
+ * Callback function to be called when a loss report is received.
+ * @param lossInfo:list of sequence number of packets, in the format describled in packet.cpp.
+ */
+ public abstract void onNAK(List<Integer> lossInfo);
+
+ /**
+ * Callback function to be called when a timeout event occurs
+ */
+ public abstract void onTimeout();
+
+ /**
+ * Callback function to be called when a data is sent.
+ * @param packetSeqNo: the data sequence number.
+ */
+ public abstract void onPacketSend(long packetSeqNo);
+
+ /**
+ * Callback function to be called when a data is received.
+ * @param packetSeqNo: the data sequence number.
+ */
+ public abstract void onPacketReceive(long packetSeqNo);
+
+ /**
+ * Callback function to be called when a UDT connection is closed.
+ */
+ public abstract void close();
+
+}
\ No newline at end of file
Property changes on: udt-java/trunk/src/main/java/udt/CongestionControl.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -55,8 +55,6 @@
/**
* the UDPEndpoint takes care of sending and receiving UDP network packets,
* dispatching them to the correct {@link UDTSession}
- *
- *
*/
public class UDPEndPoint {
@@ -83,7 +81,7 @@
//has the endpoint been stopped?
private volatile boolean stopped=false;
- public static final int DATAGRAM_SIZE=32768;
+ public static final int DATAGRAM_SIZE=1500;
/**
* bind to any local port on the given host address
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -2,12 +2,10 @@
import java.util.List;
-import javax.swing.text.Utilities;
-
import udt.util.UDTStatistics;
import udt.util.Util;
-public class UDTCongestionControl {
+public class UDTCongestionControl implements CongestionControl {
private final UDTSession session;
private final UDTStatistics statistics;
@@ -64,26 +62,30 @@
init();
}
- /**
- * Callback function to be called (only) at the start of a UDT connection.
- * when the UDT socket is conected
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#init()
*/
public void init() {
}
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#setRTT(long, long)
+ */
public void setRTT(long rtt, long rttVar){
this.roundTripTime=rtt;
}
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#setPacketArrivalRate(long, long)
+ */
public void setPacketArrivalRate(long rate, long linkCapacity){
this.packetArrivalRate=rate;
this.estimatedLinkCapacity=linkCapacity;
}
- /**
- * Inter-packet interval in seconds
- * @return
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#getSendInterval()
*/
public double getSendInterval(){
return packetSendingPeriod ;
@@ -93,14 +95,12 @@
* congestionWindowSize
* @return
*/
- protected long getCongestionWindowSize(){
- return congestionWindowSize;
+ public long getCongestionWindowSize(){
+ return 2048;//congestionWindowSize;
}
- /**
- * Callback function to be called when an ACK packet is received.
- * @param ackSeqno: the data sequence number acknowledged by this ACK.
- * see spec. page(16-17)
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onACK(long)
*/
public void onACK(long ackSeqno){
//the fixed size of a UDT packet
@@ -143,9 +143,8 @@
return inc;
}
- /**
- * Callback function to be called when a loss report is received.
- * @param lossInfo:list of sequence number of packets, in the format describled in packet.cpp.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onNAK(java.util.List)
*/
public void onNAK(List<Integer>lossInfo){
long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1);
@@ -197,24 +196,22 @@
}
}
- /**
- * Callback function to be called when a timeout event occurs
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onTimeout()
*/
public void onTimeout(){}
- /**
- * Callback function to be called when a data is sent.
- * @param packetSeqNo: the data sequence number.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onPacketSend(long)
*/
public void onPacketSend(long packetSeqNo){}
- /**
- * Callback function to be called when a data is received.
- * @param packetSeqNo: the data sequence number.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onPacketReceive(long)
*/
public void onPacketReceive(long packetSeqNo){}
- /**
- * Callback function to be called when a UDT connection is closed.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#close()
*/
public void close(){}
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -60,7 +60,7 @@
private final UDTStatistics statistics;
//the highest sequence number read by the application
- private long highestSequenceNumber=-1;
+ private volatile long highestSequenceNumber=0;
//set to 'false' by the receiver when it gets a shutdown signal from the peer
//see the noMoreData() method
@@ -88,8 +88,8 @@
}
private int getFlowWindowSize(){
- if(socket!=null)return socket.getSession().getFlowWindowSize();
- else return 64;
+ if(socket!=null)return 2*socket.getSession().getFlowWindowSize();
+ else return 128;
}
/**
* create a new {@link UDTInputStream} connected to the given socket
@@ -221,6 +221,7 @@
*
*/
protected boolean haveNewData(long sequenceNumber,byte[]data)throws IOException{
+ if(sequenceNumber<=highestSequenceNumber)return true;
return appData.offer(new AppData(sequenceNumber,data));
}
@@ -270,6 +271,31 @@
public String toString(){
return sequenceNumber+"["+data.length+"]";
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AppData other = (AppData) obj;
+ if (sequenceNumber != other.sequenceNumber)
+ return false;
+ return true;
+ }
+
+
}
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -84,7 +84,7 @@
private long lastDataPacketArrivalTime=0;
//largest received data packet sequence number(LRSN)
- private volatile long largestReceivedSeqNumber=-1;
+ private volatile long largestReceivedSeqNumber=0;
//ACK event related
@@ -233,7 +233,6 @@
}
processUDTPacket(packet);
}
- //else System.out.println("no packet.");
Thread.yield();
}
@@ -337,7 +336,6 @@
protected void onDataPacketReceived(DataPacket dp)throws IOException{
long currentSequenceNumber = dp.getPacketSequenceNumber();
-
//check whether to drop this packet
n++;
if(dropRate>0 && n % dropRate == 0){
@@ -412,6 +410,7 @@
protected void sendNAK(List<Long>sequenceNumbers)throws IOException{
if(sequenceNumbers.size()==0)return;
NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement();
+
nAckPacket.addLossInfo(sequenceNumbers);
nAckPacket.setSession(session);
nAckPacket.setDestinationID(session.getDestination().getSocketID());
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -40,6 +40,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -87,7 +88,7 @@
private final AtomicInteger unacknowledged=new AtomicInteger(0);
//for generating data packet sequence numbers
- private long nextSequenceNumber=-1;
+ private long nextSequenceNumber=0;
//the largest data packet sequence number that has actually been sent out
private volatile long largestSentSequenceNumber=-1;
@@ -100,6 +101,8 @@
private volatile boolean stopped=false;
+ private volatile AtomicReference<CountDownLatch> latchRef=new AtomicReference<CountDownLatch>();
+
public UDTSender(UDTSession session,UDPEndPoint endpoint){
this.endpoint= endpoint;
this.session=session;
@@ -110,6 +113,8 @@
sendBuffer=new ConcurrentHashMap<Long, DataPacket>(MAX_SIZE,0.75f,2);
sendQueue = new LinkedBlockingQueue<DataPacket>(MAX_SIZE);
lastAckSequenceNumber=session.getInitialSequenceNumber();
+
+ latchRef.set(new CountDownLatch(1));
start();
}
@@ -190,8 +195,8 @@
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
- if(latch!=null)latch.countDown();
- UDTCongestionControl cc=session.getCongestionControl();
+ latchRef.get().countDown();
+ CongestionControl cc=session.getCongestionControl();
if(acknowledgement.getPacketReceiveRate()>0){
long rtt=acknowledgement.getRoundTripTime();
long rttVar=acknowledgement.getRoundTripTimeVar();
@@ -230,8 +235,6 @@
for(Integer i: nak.getDecodedLossInfo()){
senderLossList.insert(new SenderLossListEntry(i));
}
- //update SND TODO
-
session.getCongestionControl().onNAK(nak.getDecodedLossInfo());
//reset EXP. EXP is in the receiver currently.... maybe move to SOCKET?
session.getSocket().getReceiver().resetEXPTimer();
@@ -297,7 +300,7 @@
statistics.incNumberOfCCSlowDownEvents();
return;
}
- DataPacket dp=sendQueue.poll(10,TimeUnit.MILLISECONDS);
+ DataPacket dp=sendQueue.poll(100,TimeUnit.MILLISECONDS);
if(dp!=null){
lastSentTime=Util.getCurrentTime();
send(dp);
@@ -309,8 +312,9 @@
statistics.incNumberOfCCWindowExceededEvents();
}
}
+ Thread.yield();
}
- Thread.yield();
+
}
/**
@@ -363,16 +367,14 @@
return senderLossList.isEmpty();
}
- private volatile CountDownLatch latch=null;
-
/**
* wait for the next acknowledge
* @throws InterruptedException
*/
- public void waitForAck(long sequenceNumber)throws InterruptedException{
- latch=new CountDownLatch(1);
+ public synchronized void waitForAck(long sequenceNumber)throws InterruptedException{
while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
- latch.await(10, TimeUnit.MILLISECONDS);
+ latchRef.set(new CountDownLatch(1));
+ latchRef.get().await(10, TimeUnit.MILLISECONDS);
}
}
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -62,13 +62,13 @@
protected int receiveBufferSize=64*32768;
- protected final UDTCongestionControl cc;
+ protected final CongestionControl cc;
/**
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=32;
+ protected int flowWindowSize=64;
/**
* remote UDT entity (address and socket ID)
@@ -84,6 +84,12 @@
public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE;
/**
+ * key for a system property defining the CC class to be used
+ * @see CongestionControl
+ */
+ public static final String CC_CLASS="udt.congestioncontrol.class";
+
+ /**
* Buffer size (i.e. datagram size)
* This is negotiated during connection setup
*/
@@ -99,7 +105,18 @@
statistics=new UDTStatistics(description);
mySocketID=nextSocketID.incrementAndGet();
this.destination=destination;
- cc=new UDTCongestionControl(this);
+
+ //init configurable CC
+ String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName());
+ Object ccObject=null;
+ try{
+ Class<?>clazz=Class.forName(clazzP);
+ ccObject=clazz.getDeclaredConstructor(UDTSession.class).newInstance(this);
+ }catch(Exception e){
+ ccObject=new UDTCongestionControl(this);
+ }
+ cc=(CongestionControl)ccObject;
+ System.out.println("using "+cc.getClass().getName());
}
public abstract void received(UDTPacket packet, Destination peer);
@@ -108,7 +125,7 @@
return socket;
}
- public UDTCongestionControl getCongestionControl() {
+ public CongestionControl getCongestionControl() {
return cc;
}
Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -182,7 +182,7 @@
packet.setData(chunk);
//put the packet into the send queue
while(!sender.sendUdtPacket(packet, timeout, units)){
- System.out.println("WAIT");
+ System.out.println("SOCKET WAIT");
}
}
if(length>0)active=true;
Modified: udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -109,7 +109,7 @@
*/
public void addLossInfo(long firstSequenceNumber, long lastSequenceNumber) {
//check if we really need an interval
- if(lastSequenceNumber-firstSequenceNumber==1){
+ if(lastSequenceNumber-firstSequenceNumber==0){
addLossInfo(firstSequenceNumber);
return;
}
Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -32,7 +32,7 @@
package udt.receiver;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
@@ -50,25 +50,18 @@
public ReceiverLossList(){
backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(16);
- }
+ }
public void insert(ReceiverLossListEntry entry){
- backingList.add(entry);
+ synchronized (backingList) {
+ if(!backingList.contains(entry)){
+ backingList.add(entry);
+ }
+ }
}
-
- public void remove(ReceiverLossListEntry obj){
- backingList.remove(obj);
- }
public void remove(long seqNo){
- Iterator<ReceiverLossListEntry>iterator=backingList.iterator();
- while(iterator.hasNext()){
- ReceiverLossListEntry e=iterator.next();
- if(e.getSequenceNumber()==seqNo){
- iterator.remove();
- break;
- }
- }
+ backingList.remove(new ReceiverLossListEntry(seqNo));
}
public boolean contains(ReceiverLossListEntry obj){
@@ -102,8 +95,10 @@
public List<Long>getFilteredSequenceNumbers(long RTT, boolean doFeedback){
List<Long>result=new ArrayList<Long>();
long now=Util.getCurrentTime();
- for(ReceiverLossListEntry e: backingList){
- if( (now-e.getLastFeedbackTime())>2*RTT){
+ ReceiverLossListEntry[]sorted=backingList.toArray(new ReceiverLossListEntry[0]);
+ Arrays.sort(sorted);
+ for(ReceiverLossListEntry e: sorted){
+ if( (now-e.getLastFeedbackTime())>e.getK()*RTT){
result.add(e.getSequenceNumber());
if(doFeedback)e.feedback();
}
Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -48,7 +48,9 @@
* @param sequenceNumber
*/
public ReceiverLossListEntry(long sequenceNumber){
- if(sequenceNumber<=0)throw new IllegalArgumentException();
+ if(sequenceNumber<=0){
+ throw new IllegalArgumentException("Got sequence number "+sequenceNumber);
+ }
this.sequenceNumber = sequenceNumber;
this.lastFeedbacktime=Util.getCurrentTime();
}
@@ -90,4 +92,30 @@
return sequenceNumber+"[k="+k+",time="+lastFeedbacktime+"]";
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (k ^ (k >>> 32));
+ result = prime * result
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ReceiverLossListEntry other = (ReceiverLossListEntry) obj;
+ if (sequenceNumber != other.sequenceNumber)
+ return false;
+ return true;
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -49,7 +49,9 @@
}
public void insert(SenderLossListEntry obj){
- backingList.add(obj);
+ synchronized (backingList) {
+ if(!backingList.contains(obj))backingList.add(obj);
+ }
}
public void remove(long seqNo){
@@ -61,6 +63,7 @@
return;
}
}
+ //backingList.remove(new SenderLossListEntry(seqNo));
}
/**
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -71,4 +71,27 @@
return (int)(sequenceNumber-o.sequenceNumber);
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SenderLossListEntry other = (SenderLossListEntry) obj;
+ if (sequenceNumber != other.sequenceNumber)
+ return false;
+ return true;
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/util/CircularArray.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/CircularArray.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/util/CircularArray.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -35,18 +35,23 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ * Circular array: the most recent value overwrites the oldest one if there is no more free
+ * space in the array
+ */
public class CircularArray<T>{
protected int position=0;
+
protected boolean haveOverflow=false;
- //the maximum number of entries
- protected int max=1;
- protected List<T>circularArray;
+ protected final int max;
+ protected final List<T>circularArray;
+
/**
- * ArrayList von T(object's type). The most recent value overwrite the oldest one
- * if no more free space in the array
+ * Create a new circularArray of the given size
+ *
* @param size
*/
public CircularArray(int size){
@@ -55,10 +60,7 @@
}
/**
- * Insert the specified entry at the specified position in this list.
- * the most recent value overwrite the oldest one
- * if no more free space in the circularArray
- * @param entry
+ * add an entry
*/
public void add(T entry){
if(position>=max){
@@ -74,7 +76,6 @@
/**
* Returns the number of elements in this list
- * @return
*/
public int size(){
return circularArray.size();
@@ -83,7 +84,5 @@
public String toString(){
return circularArray.toString();
}
-
-
-
+
}
Modified: udt-java/trunk/src/main/java/udt/util/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -74,6 +74,9 @@
*/
@Override
public boolean offer(E e) {
+ if(contains(e)){
+ return true;
+ }
if(size()<capacity){
return super.offer(e);
}else return false;
Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -33,7 +33,6 @@
package udt.util;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.security.MessageDigest;
Added: udt-java/trunk/src/test/java/udt/NullCongestionControl.java
===================================================================
--- udt-java/trunk/src/test/java/udt/NullCongestionControl.java (rev 0)
+++ udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -0,0 +1,48 @@
+package udt;
+
+import java.util.List;
+
+public class NullCongestionControl implements CongestionControl {
+
+ private final UDTSession session;
+
+ public NullCongestionControl(UDTSession session){
+ this.session=session;
+ }
+
+ public void close() {
+ }
+
+ public long getCongestionWindowSize() {
+ return Long.MAX_VALUE;
+ }
+
+ public double getSendInterval() {
+ return 0;
+ }
+
+ public void init() {
+ }
+
+ public void onACK(long ackSeqno) {
+ }
+
+ public void onNAK(List<Integer> lossInfo) {
+ }
+
+ public void onPacketReceive(long packetSeqNo) {
+ }
+
+ public void onPacketSend(long packetSeqNo) {
+ }
+
+ public void onTimeout() {
+ }
+
+ public void setPacketArrivalRate(long rate, long linkCapacity) {
+ }
+
+ public void setRTT(long rtt, long rttVar) {
+ }
+
+}
Property changes on: udt-java/trunk/src/test/java/udt/NullCongestionControl.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -7,10 +7,12 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import udt.NullCongestionControl;
import udt.UDTClient;
import udt.UDTInputStream;
import udt.UDTReceiver;
import udt.UDTServerSocket;
+import udt.UDTSession;
import udt.UDTSocket;
import udt.UDTTestBase;
import udt.util.UDTStatistics;
@@ -20,7 +22,8 @@
boolean running=false;
//how many
- int num_packets=200;
+ int num_packets=100;
+
//how large is a single packet
int size=1*1024*1024;
@@ -30,6 +33,7 @@
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
+ System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -3,20 +3,17 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
-import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.TestCase;
/**
* send some data over a UDP connection and measure performance
- *
*/
public class UDPTest extends TestCase {
- final int BUFSIZE=32768;
- final int num_packets=10*1000;
- final int packetSize=1024;
+ final int num_packets=100*1000;
+ final int packetSize=1500;
public void test1()throws Exception{
runServer();
@@ -25,20 +22,15 @@
//generate a test array with random content
N=num_packets*packetSize;
- byte[]data=new byte[N];
+ byte[]data=new byte[packetSize];
new Random().nextBytes(data);
long start=System.currentTimeMillis();
- ByteBuffer bb=ByteBuffer.wrap(data);
- DatagramPacket dp=new DatagramPacket(new byte[BUFSIZE],BUFSIZE);
+ DatagramPacket dp=new DatagramPacket(new byte[packetSize],packetSize);
dp.setAddress(InetAddress.getByName("localhost"));
dp.setPort(65321);
-
- System.out.println("Sending data block of <"+N+"> bytes");
- while(bb.remaining()>0){
- int len=Math.min(bb.remaining(),BUFSIZE);
- byte[]chunk=new byte[len];
- bb.get(chunk);
- dp.setData(chunk);
+ System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes");
+ for(int i=0;i<num_packets;i++){
+ dp.setData(data);
s.send(dp);
}
System.out.println("Finished sending.");
@@ -46,7 +38,8 @@
System.out.println("Server stopped.");
long end=System.currentTimeMillis();
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
- System.out.println("Rate "+N/(end-start)+" Kbytes/sec");
+ System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec");
+ System.out.println("Rate "+num_packets+" packets/sec");
System.out.println("Server received: "+total);
}
@@ -61,7 +54,7 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
- byte[]buf=new byte[BUFSIZE];
+ byte[]buf=new byte[packetSize];
DatagramPacket dp=new DatagramPacket(buf,buf.length);
while(true){
serverSocket.receive(dp);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|