udt-java-commits Mailing List for UDT-Java (Page 3)
Status: Alpha
Brought to you by:
bschuller
You can subscribe to this list here.
2010 |
Jan
|
Feb
|
Mar
|
Apr
(17) |
May
(4) |
Jun
(1) |
Jul
|
Aug
(3) |
Sep
(7) |
Oct
|
Nov
(1) |
Dec
|
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(1) |
Feb
(1) |
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(10) |
Sep
|
Oct
(1) |
Nov
(1) |
Dec
(2) |
2012 |
Jan
|
Feb
(4) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <bsc...@us...> - 2010-04-21 20:36:43
|
Revision: 22 http://udt-java.svn.sourceforge.net/udt-java/?rev=22&view=rev Author: bschuller Date: 2010-04-21 20:36:36 +0000 (Wed, 21 Apr 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 15:45:00 UTC (rev 21) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 20:36:36 UTC (rev 22) @@ -7,62 +7,61 @@ import udt.util.UDTStatistics; import udt.util.Util; +/** + * default UDT congestion control.<br/> + * + * The algorithm is adapted from the C++ reference implementation. + */ public class UDTCongestionControl implements CongestionControl { private static final Logger logger=Logger.getLogger(UDTCongestionControl.class.getName()); private final UDTSession session; - + private final UDTStatistics statistics; - + //round trip time in microseconds private long roundTripTime=2*Util.getSYNTime(); - + //rate in packets per second private long packetArrivalRate=0; - + //link capacity in packets per second private long estimatedLinkCapacity=0; - - long maxControlWindowSize=128; // Packet sending period = packet send interval, in microseconds private double packetSendingPeriod=1; + // Congestion window size, in packets private long congestionWindowSize=16; - - //number of packets to be increased in the next SYN period - private double numOfIncreasingPacket; - + //last rate increase time (microsecond value) long lastRateIncreaseTime=Util.getCurrentTime(); - + /*if in slow start phase*/ boolean slowStartPhase=true; - + /*last ACKed seq no*/ long lastAckSeqNumber=-1; - + /*max packet seq. no. sent out when last decrease happened*/ private long lastDecreaseSeqNo; - //value of packetSendPeriod when last decrease happened - long lastDecreasePeriod; - //NAK counter - long nACKCount=1; - + private long nACKCount=1; + //number of decreases in a congestion epoch long decCount=1; - + //random threshold on decrease by number of loss events long decreaseRandom=1; - + //average number of NAKs per congestion long averageNACKNum; - boolean loss=false; - + //this flag avoids immediate rate increase after a NAK + private boolean loss=false; + public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); @@ -108,7 +107,7 @@ public double getSendInterval(){ return packetSendingPeriod; } - + /** * congestionWindowSize * @return @@ -125,7 +124,7 @@ if(slowStartPhase){ congestionWindowSize+=ackSeqno-lastAckSeqNumber; lastAckSeqNumber = ackSeqno; - + //but not beyond a maximum size if(congestionWindowSize>session.getFlowWindowSize()){ System.out.println("slow start ends on ACK"); @@ -137,7 +136,7 @@ packetSendingPeriod=(double)congestionWindowSize/(roundTripTime+Util.getSYNTimeD()); } } - + }else{ //1.if it is not in slow start phase,set the congestion window size //to the product of packet arrival rate and(rtt +SYN) @@ -148,51 +147,51 @@ } } - //no rate increase in slow start + //no rate increase during slow start if(slowStartPhase)return; - + + //no rate increase "immediately" after a NAK if(loss){ loss=false; return; } - - //4.compute the number of sent packets to be increase in the next SYN period - //and update the send intervall - numOfIncreasingPacket=computeNumOfIncreasingPacket(); - - //4.update the send period : + + //4. compute the increase in sent packets for the next SYN period + double numOfIncreasingPacket=computeNumOfIncreasingPacket(); + + //5. update the send period double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); packetSendingPeriod=factor*packetSendingPeriod; //packetSendingPeriod=0.995*packetSendingPeriod; //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod); - + statistics.setSendPeriod(packetSendingPeriod); } private final long PS=UDPEndPoint.DATAGRAM_SIZE; private final double BetaDivPS=0.0000015/PS; - + //see spec page 16 private double computeNumOfIncreasingPacket (){ - //link capacity and sending speed, in packets per second - double B=estimatedLinkCapacity; - double C=1000000.0/packetSendingPeriod; - - if(B<=C)return 1.0/UDPEndPoint.DATAGRAM_SIZE; - - double exp=Math.ceil(Math.log10((B-C)*PS*8)); - double power10 = Math.pow( 10.0, exp)* BetaDivPS; - double inc = Math.max(power10, 1/PS); - return inc; + //difference in link capacity and sending speed, in packets per second + double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod; + + if(remaining<=0){ + return 1.0/UDPEndPoint.DATAGRAM_SIZE; + } + else{ + double exp=Math.ceil(Math.log10(remaining*PS*8)); + double power10 = Math.pow( 10.0, exp)* BetaDivPS; + return Math.max(power10, 1/PS); + } } - + /* (non-Javadoc) * @see udt.CongestionControl#onNAK(java.util.List) */ public void onNAK(List<Integer>lossInfo){ loss=true; - long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1); - + long firstBiggestlossSeqNo=lossInfo.get(0); nACKCount++; /*1) If it is in slow start phase, set inter-packet interval to 1/recvrate. Slow start ends. Stop. */ @@ -206,12 +205,10 @@ slowStartPhase = false; return; } - + long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber(); - // 2)If this NAK starts a new congestion epoch if(firstBiggestlossSeqNo>lastDecreaseSeqNo){ - // -increase inter-packet interval packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125); // -Update AvgNAKNum(the average number of NAKs per congestion) @@ -224,20 +221,19 @@ // -Update LastDecSeq lastDecreaseSeqNo = currentMaxSequenceNumber; // -Stop. - statistics.setSendPeriod(packetSendingPeriod); } - //* 3) If DecCount <= 5, and NAKCount == DecCount * DecRandom: - if(decCount<=5 && nACKCount==decCount*decreaseRandom){ + else if(decCount<=5 && nACKCount==decCount*decreaseRandom){ // a. Update SND period: SND = SND * 1.125; packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125); // b. Increase DecCount by 1; decCount++; // c. Record the current largest sent sequence number (LastDecSeq). lastDecreaseSeqNo= currentMaxSequenceNumber; - statistics.setSendPeriod(packetSendingPeriod); - return; } + + statistics.setSendPeriod(packetSendingPeriod); + return; } /* (non-Javadoc) Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 15:45:00 UTC (rev 21) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 20:36:36 UTC (rev 22) @@ -253,10 +253,10 @@ statistics.incNumberOfNAKReceived(); statistics.storeParameters(); - //if(logger.isLoggable(Level.FINER)){ - System.out.println("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " + if(logger.isLoggable(Level.FINER)){ + logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " +"set send period to "+session.getCongestionControl().getSendInterval()); - //} + } return; } @@ -315,12 +315,9 @@ if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() && unAcknowledged<session.getFlowWindowSize()){ -// if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ -// statistics.incNumberOfCCSlowDownEvents(); -// return; -// } + if(sendQueue.size()==0){ - Thread.yield(); + //Thread.yield(); return; } DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); @@ -340,10 +337,12 @@ //wait double snd=session.getCongestionControl().getSendInterval(); long passed=Util.getCurrentTime()-iterationStart; + int x=0; while(snd-passed>0){ - //busy wait, but we cannot wait with microsecond precision + if(x++==0)statistics.incNumberOfCCSlowDownEvents(); + //we cannot wait with microsecond precision if(snd-passed>750)Thread.sleep(1); - statistics.incNumberOfCCSlowDownEvents(); + else Thread.yield(); passed=Util.getCurrentTime()-iterationStart; } @@ -394,8 +393,9 @@ return largestSentSequenceNumber>=sequenceNumber; } + boolean haveLostPackets(){ - return senderLossList.isEmpty(); + return !senderLossList.isEmpty(); } /** Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 15:45:00 UTC (rev 21) +++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 20:36:36 UTC (rev 22) @@ -79,7 +79,6 @@ } } double res=total/count; - //System.out.println("median: "+median+" filtered "+res); return res; } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 15:45:00 UTC (rev 21) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-21 20:36:36 UTC (rev 22) @@ -31,7 +31,6 @@ *********************************************************************************/ package udt.sender; -import java.util.Iterator; import java.util.concurrent.PriorityBlockingQueue; /** Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 15:45:00 UTC (rev 21) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 20:36:36 UTC (rev 22) @@ -20,7 +20,7 @@ boolean running=false; //how many - int num_packets=200; + int num_packets=100; //how large is a single packet int size=1*1024*1024; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-04-21 15:45:06
|
Revision: 21 http://udt-java.svn.sourceforge.net/udt-java/?rev=21&view=rev Author: bschuller Date: 2010-04-21 15:45:00 +0000 (Wed, 21 Apr 2010) Log Message: ----------- rate control stuff --- still not so great IMO Modified Paths: -------------- udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-21 15:45:00 UTC (rev 21) @@ -107,8 +107,6 @@ } active = true; try{ - //packet received means we should not yet expire - socket.getReceiver().resetEXPTimer(); if(packet.forSender()){ socket.getSender().receive(lastPacket); }else{ Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 15:45:00 UTC (rev 21) @@ -1,12 +1,16 @@ package udt; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; import udt.util.UDTStatistics; import udt.util.Util; public class UDTCongestionControl implements CongestionControl { + private static final Logger logger=Logger.getLogger(UDTCongestionControl.class.getName()); + private final UDTSession session; private final UDTStatistics statistics; @@ -49,14 +53,16 @@ long nACKCount=1; //number of decreases in a congestion epoch - long congestionEpochDecreaseCount=1; + long decCount=1; //random threshold on decrease by number of loss events - long decreaseRandom; + long decreaseRandom=1; //average number of NAKs per congestion long averageNACKNum; + boolean loss=false; + public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); @@ -115,41 +121,65 @@ * @see udt.CongestionControl#onACK(long) */ public void onACK(long ackSeqno){ - //the fixed size of a UDT packet - long maxSegmentSize=UDPEndPoint.DATAGRAM_SIZE; - - //1.if it is in slow start phase,set the congestion window size - //to the product of packet arrival rate and(rtt +SYN) - double A=packetArrivalRate*(roundTripTime+Util.getSYNTime()); - //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A); + //increase window during slow start if(slowStartPhase){ - congestionWindowSize=16; - slowStartPhase=false; - return; + congestionWindowSize+=ackSeqno-lastAckSeqNumber; + lastAckSeqNumber = ackSeqno; + + //but not beyond a maximum size + if(congestionWindowSize>session.getFlowWindowSize()){ + System.out.println("slow start ends on ACK"); + slowStartPhase=false; + if(packetArrivalRate>0){ + packetSendingPeriod=1000000.0/packetArrivalRate; + } + else{ + packetSendingPeriod=(double)congestionWindowSize/(roundTripTime+Util.getSYNTimeD()); + } + } + }else{ + //1.if it is not in slow start phase,set the congestion window size + //to the product of packet arrival rate and(rtt +SYN) + double A=packetArrivalRate/1000000.0*(roundTripTime+Util.getSYNTimeD()); congestionWindowSize=(long)A+16; + if(logger.isLoggable(Level.FINER)){ + logger.finer("receive rate "+packetArrivalRate+" rtt "+roundTripTime+" set to window size: "+(A+16)); + } } + //no rate increase in slow start + if(slowStartPhase)return; + + if(loss){ + loss=false; + return; + } + //4.compute the number of sent packets to be increase in the next SYN period //and update the send intervall - if(estimatedLinkCapacity<= packetArrivalRate){ - numOfIncreasingPacket= 1.0/maxSegmentSize; - }else{ - numOfIncreasingPacket=computeNumOfIncreasingPacket(); - } + numOfIncreasingPacket=computeNumOfIncreasingPacket(); + //4.update the send period : double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); - packetSendingPeriod=packetSendingPeriod*factor; + packetSendingPeriod=factor*packetSendingPeriod; + //packetSendingPeriod=0.995*packetSendingPeriod; + //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod); + statistics.setSendPeriod(packetSendingPeriod); } + private final long PS=UDPEndPoint.DATAGRAM_SIZE; + private final double BetaDivPS=0.0000015/PS; + //see spec page 16 - final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE; private double computeNumOfIncreasingPacket (){ - long B=estimatedLinkCapacity; - double C=1.0/packetSendingPeriod; - if(B<=C)return C; - long PS=UDPEndPoint.DATAGRAM_SIZE; + //link capacity and sending speed, in packets per second + double B=estimatedLinkCapacity; + double C=1000000.0/packetSendingPeriod; + + if(B<=C)return 1.0/UDPEndPoint.DATAGRAM_SIZE; + double exp=Math.ceil(Math.log10((B-C)*PS*8)); double power10 = Math.pow( 10.0, exp)* BetaDivPS; double inc = Math.max(power10, 1/PS); @@ -160,49 +190,49 @@ * @see udt.CongestionControl#onNAK(java.util.List) */ public void onNAK(List<Integer>lossInfo){ + loss=true; long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1); - long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber(); - lastAckSeqNumber = currentMaxSequenceNumber; + nACKCount++; /*1) If it is in slow start phase, set inter-packet interval to 1/recvrate. Slow start ends. Stop. */ if(slowStartPhase){ if(packetArrivalRate>0){ - packetSendingPeriod = 1e6/packetArrivalRate; + packetSendingPeriod = 100000.0/packetArrivalRate; } else{ - packetSendingPeriod=congestionWindowSize*(roundTripTime+Util.getSYNTime()); + packetSendingPeriod=congestionWindowSize/(roundTripTime+Util.getSYNTime()); } slowStartPhase = false; return; } - - //start new congestion epoch + long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber(); + + // 2)If this NAK starts a new congestion epoch if(firstBiggestlossSeqNo>lastDecreaseSeqNo){ - // 2)If this NAK starts a new congestion epoch + // -increase inter-packet interval packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125); // -Update AvgNAKNum(the average number of NAKs per congestion) averageNACKNum = (int)Math.ceil(averageNACKNum*0.875 + nACKCount*0.125); - // -reset NAKCount to 1, + // -reset NAKCount and DecCount to 1, nACKCount=1; - /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum.. */ + decCount=1; + /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum */ decreaseRandom =(int)Math.ceil((averageNACKNum-1)*Math.random()+1); // -Update LastDecSeq lastDecreaseSeqNo = currentMaxSequenceNumber; // -Stop. statistics.setSendPeriod(packetSendingPeriod); - return; } //* 3) If DecCount <= 5, and NAKCount == DecCount * DecRandom: - if(congestionEpochDecreaseCount<=5 && - nACKCount==congestionEpochDecreaseCount*decreaseRandom){ + if(decCount<=5 && nACKCount==decCount*decreaseRandom){ // a. Update SND period: SND = SND * 1.125; packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125); // b. Increase DecCount by 1; - congestionEpochDecreaseCount++; + decCount++; // c. Record the current largest sent sequence number (LastDecSeq). lastDecreaseSeqNo= currentMaxSequenceNumber; statistics.setSendPeriod(packetSendingPeriod); Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 15:45:00 UTC (rev 21) @@ -110,7 +110,7 @@ long packetArrivalSpeed; //round trip time, calculated from ACK/ACK2 pairs - long roundTripTime=10*Util.getSYNTime(); + long roundTripTime=50*1000; //round trip time variance long roundTripTimeVar=roundTripTime/2; @@ -125,8 +125,13 @@ private long nextEXP; //microseconds to next EXP event - private long EXP_INTERVAL=10*Util.getSYNTime(); + private long EXP_INTERVAL=Util.getSYNTime(); + //instant when the session was created (for expiry checking) + private final long sessionUpSince; + //milliseconds to timeout a new session that stays idle + private final long IDLE_TIMEOUT = 3*60*1000; + //buffer size for storing data private final long bufferSize; @@ -150,6 +155,7 @@ public UDTReceiver(UDTSession session,UDPEndPoint endpoint){ this.endpoint = endpoint; this.session=session; + this.sessionUpSince=System.currentTimeMillis(); this.statistics=session.getStatistics(); if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); ackHistoryWindow = new AckHistoryWindow(16); @@ -295,7 +301,7 @@ UDTSender sender=session.getSocket().getSender(); //put all the unacknowledged packets in the senders loss list sender.putUnacknowledgedPacketsIntoLossList(); - if(expCount>16){ + if(expCount>16 && System.currentTimeMillis()-sessionUpSince > IDLE_TIMEOUT){ if(!connectionExpiryDisabled &&!stopped){ sendShutdown(); stop(); @@ -508,6 +514,10 @@ expCount=0; } + protected void resetEXPCount(){ + expCount=0; + } + protected void onShutdown()throws IOException{ stop(); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 15:45:00 UTC (rev 21) @@ -96,9 +96,6 @@ //last acknowledge number, initialised to the initial sequence number private long lastAckSequenceNumber; - //instant when the last packet was sent - private long lastSentTime=0; - //size of the send queue public final int sendQueueLength; @@ -199,6 +196,9 @@ NegativeAcknowledgement nak=(NegativeAcknowledgement)p; onNAKPacketReceived(nak); } + else if (p instanceof KeepAlive) { + session.getSocket().getReceiver().resetEXPCount(); + } } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ @@ -248,14 +248,16 @@ for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } - if(logger.isLoggable(Level.FINER)){ - logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo()); - } session.getCongestionControl().onNAK(nak.getDecodedLossInfo()); - //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET? session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); statistics.storeParameters(); + + //if(logger.isLoggable(Level.FINER)){ + System.out.println("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " + +"set send period to "+session.getCongestionControl().getSendInterval()); + //} + return; } @@ -279,16 +281,19 @@ * sender algorithm */ public void senderAlgorithm()throws InterruptedException, IOException{ + long iterationStart=Util.getCurrentTime(); //if the sender's loss list is not empty SenderLossListEntry entry=senderLossList.getFirstEntry(); if (entry!=null) { long seqNumber = entry.getSequenceNumber(); + + //TODO //if the current seqNumber is 16n,check the timeOut in the //loss list and send a message drop request. - if((seqNumber%16)==0){ - //TODO + //if((seqNumber%16)==0){ //sendLossList.checkTimeOut(timeToLive); - } + //} + try { //retransmit the packet with the first entry in the list //as sequence number and remove it from the list @@ -307,13 +312,13 @@ //if the number of unacknowledged data packets does not exceed the congestion //and the flow window sizes, pack a new packet int unAcknowledged=unacknowledged.get(); - double snd=session.getCongestionControl().getSendInterval(); + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() && unAcknowledged<session.getFlowWindowSize()){ - if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ - statistics.incNumberOfCCSlowDownEvents(); - return; - } +// if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ +// statistics.incNumberOfCCSlowDownEvents(); +// return; +// } if(sendQueue.size()==0){ Thread.yield(); return; @@ -321,7 +326,6 @@ DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); if(dp!=null){ send(dp); - lastSentTime=Util.getCurrentTime(); largestSentSequenceNumber=dp.getPacketSequenceNumber(); } }else{ @@ -332,6 +336,17 @@ Thread.sleep(1); //waitForAck(); } + + //wait + double snd=session.getCongestionControl().getSendInterval(); + long passed=Util.getCurrentTime()-iterationStart; + while(snd-passed>0){ + //busy wait, but we cannot wait with microsecond precision + if(snd-passed>750)Thread.sleep(1); + statistics.incNumberOfCCSlowDownEvents(); + passed=Util.getCurrentTime()-iterationStart; + } + } /** Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 15:45:00 UTC (rev 21) @@ -35,7 +35,8 @@ import udt.util.CircularArray; /** - * a circular array that records time intervals between two data packets + * a circular array that records time intervals between two probing data packets. + * It is used to determine the estimated link capacity. * @see {@link CircularArray} * */ @@ -63,6 +64,7 @@ median+=circularArray.get(i).doubleValue(); } median=median/num; + //median filtering double upper=median*8; double lower=median/8; @@ -76,9 +78,9 @@ count++; } } - median=total/count; - //System.out.println("median "+median); - return median; + double res=total/count; + //System.out.println("median: "+median+" filtered "+res); + return res; } /** Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-21 15:45:00 UTC (rev 21) @@ -18,6 +18,8 @@ }while(!serverStarted); File f=new File("src/test/java/datafile"); + f=new File("/tmp/200MB"); + File tmp=File.createTempFile("udtest-", null); String[] args=new String[]{"localhost","65321",f.getAbsolutePath(),tmp.getAbsolutePath()}; Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 15:45:00 UTC (rev 21) @@ -20,7 +20,7 @@ boolean running=false; //how many - int num_packets=300; + int num_packets=200; //how large is a single packet int size=1*1024*1024; @@ -34,7 +34,7 @@ //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - UDTReceiver.connectionExpiryDisabled=true; + //UDTReceiver.connectionExpiryDisabled=true; doTest(); } @@ -109,7 +109,7 @@ c=is.read(buf); if(c<0)break; else{ - //md5.update(buf, 0, c); + md5.update(buf, 0, c); total+=c; Thread.yield(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-04-20 21:21:21
|
Revision: 20 http://udt-java.svn.sourceforge.net/udt-java/?rev=20&view=rev Author: bschuller Date: 2010-04-20 21:21:15 +0000 (Tue, 20 Apr 2010) Log Message: ----------- disable connection expiry Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 20:17:07 UTC (rev 19) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 21:21:15 UTC (rev 20) @@ -125,7 +125,7 @@ private long nextEXP; //microseconds to next EXP event - private long EXP_INTERVAL=2*Util.getSYNTime(); + private long EXP_INTERVAL=10*Util.getSYNTime(); //buffer size for storing data private final long bufferSize; @@ -513,7 +513,6 @@ } public void stop()throws IOException{ - System.out.println("STOP"); stopped=true; session.getSocket().close(); //stop our sender as well Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-20 20:17:07 UTC (rev 19) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20) @@ -40,6 +40,7 @@ import udt.UDTClient; import udt.UDTInputStream; import udt.UDTOutputStream; +import udt.UDTReceiver; /** * helper class for receiving a single file via UDT @@ -66,6 +67,7 @@ public void run(){ configure(); try{ + UDTReceiver.connectionExpiryDisabled=true; InetAddress myHost=localIP!=null?InetAddress.getByName(localIP):InetAddress.getLocalHost(); UDTClient client=localPort!=-1?new UDTClient(myHost,localPort):new UDTClient(myHost); client.connect(serverHost, serverPort); Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-20 20:17:07 UTC (rev 19) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-20 21:21:15 UTC (rev 20) @@ -42,6 +42,7 @@ import udt.UDTInputStream; import udt.UDTOutputStream; +import udt.UDTReceiver; import udt.UDTServerSocket; import udt.UDTSocket; import udt.packets.PacketUtil; @@ -72,6 +73,7 @@ public void run(){ configure(); try{ + UDTReceiver.connectionExpiryDisabled=true; InetAddress myHost=localIP!=null?InetAddress.getByName(localIP):InetAddress.getLocalHost(); UDTServerSocket server=new UDTServerSocket(myHost,serverPort); while(true){ @@ -134,7 +136,7 @@ bb.get(fileName); File file=new File(new String(fileName)); - System.out.println("[SendFile] File requested: "+file.getPath()); + System.out.println("[SendFile] File requested: '"+file.getPath()+"'"); FileInputStream fis=new FileInputStream(file); try{ Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 20:17:07 UTC (rev 19) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20) @@ -13,7 +13,10 @@ public void test1()throws Exception{ runServer(); - while(!serverStarted)Thread.sleep(100); + do{ + Thread.sleep(500); + }while(!serverStarted); + File f=new File("src/test/java/datafile"); File tmp=File.createTempFile("udtest-", null); @@ -28,9 +31,9 @@ private void runServer(){ Runnable r=new Runnable(){ public void run(){ - serverStarted=true; String []args=new String[]{"65321"}; try{ + serverStarted=true; SendFile.main(args); }catch(Exception ex){ ex.printStackTrace(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-04-20 20:17:14
|
Revision: 19 http://udt-java.svn.sourceforge.net/udt-java/?rev=19&view=rev Author: bschuller Date: 2010-04-20 20:17:07 +0000 (Tue, 20 Apr 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/CongestionControl.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/PacketUtil.java udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/NullCongestionControl.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Modified: udt-java/trunk/src/main/java/udt/CongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-20 20:17:07 UTC (rev 19) @@ -19,12 +19,18 @@ public abstract void setRTT(long rtt, long rttVar); /** - * set packet arrival rate and link capacity + * update packet arrival rate and link capacity with the + * values received in an ACK packet * @param rate * @param linkCapacity */ - public abstract void setPacketArrivalRate(long rate, long linkCapacity); + public abstract void updatePacketArrivalRate(long rate, long linkCapacity); + public long getPacketArrivalRate(); + + public long getEstimatedLinkCapacity(); + + /** * Inter-packet interval in seconds * @return @@ -55,13 +61,13 @@ public abstract void onTimeout(); /** - * Callback function to be called when a data is sent. + * Callback function to be called when a data packet is sent. * @param packetSeqNo: the data sequence number. */ public abstract void onPacketSend(long packetSeqNo); /** - * Callback function to be called when a data is received. + * Callback function to be called when a data packet is received. * @param packetSeqNo: the data sequence number. */ public abstract void onPacketReceive(long packetSeqNo); Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-20 20:17:07 UTC (rev 19) @@ -235,6 +235,8 @@ * </ul> * @throws IOException */ + private long lastDestID=-1; + private UDTSession lastSession; protected void doReceive()throws IOException{ try{ try{ @@ -265,7 +267,16 @@ else{ //dispatch to existing session - UDTSession session=sessions.get(packet.getDestinationID()); + long dest=packet.getDestinationID(); + UDTSession session; + if(dest==lastDestID){ + session=lastSession; + } + else{ + session=sessions.get(dest); + lastSession=session; + lastDestID=dest; + } if(session==null){ logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); } Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 20:17:07 UTC (rev 19) @@ -8,15 +8,17 @@ public class UDTCongestionControl implements CongestionControl { private final UDTSession session; + private final UDTStatistics statistics; + //round trip time in microseconds - private long roundTripTime=10*Util.getSYNTime(); + private long roundTripTime=2*Util.getSYNTime(); //rate in packets per second - private long packetArrivalRate=100; + private long packetArrivalRate=0; //link capacity in packets per second - private long estimatedLinkCapacity; + private long estimatedLinkCapacity=0; long maxControlWindowSize=128; @@ -58,15 +60,14 @@ public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); - lastDecreaseSeqNo= session.getInitialSequenceNumber()-1; + lastDecreaseSeqNo=session.getInitialSequenceNumber()-1; init(); } /* (non-Javadoc) * @see udt.CongestionControl#init() */ - public void init() { - + public void init() { } /* (non-Javadoc) @@ -79,16 +80,27 @@ /* (non-Javadoc) * @see udt.CongestionControl#setPacketArrivalRate(long, long) */ - public void setPacketArrivalRate(long rate, long linkCapacity){ - this.packetArrivalRate=rate; - this.estimatedLinkCapacity=linkCapacity; + public void updatePacketArrivalRate(long rate, long linkCapacity){ + //see spec p. 14. + if(packetArrivalRate>0)packetArrivalRate=(packetArrivalRate*7+rate)/8; + else packetArrivalRate=rate; + if(estimatedLinkCapacity>0)estimatedLinkCapacity=(estimatedLinkCapacity*7+linkCapacity)/8; + else estimatedLinkCapacity=linkCapacity; } + public long getPacketArrivalRate() { + return packetArrivalRate; + } + + public long getEstimatedLinkCapacity() { + return estimatedLinkCapacity; + } + /* (non-Javadoc) * @see udt.CongestionControl#getSendInterval() */ public double getSendInterval(){ - return packetSendingPeriod ; + return packetSendingPeriod; } /** @@ -96,7 +108,7 @@ * @return */ public long getCongestionWindowSize(){ - return 2048;//congestionWindowSize; + return congestionWindowSize; } /* (non-Javadoc) @@ -109,6 +121,7 @@ //1.if it is in slow start phase,set the congestion window size //to the product of packet arrival rate and(rtt +SYN) double A=packetArrivalRate*(roundTripTime+Util.getSYNTime()); + //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A); if(slowStartPhase){ congestionWindowSize=16; slowStartPhase=false; @@ -120,26 +133,26 @@ //4.compute the number of sent packets to be increase in the next SYN period //and update the send intervall if(estimatedLinkCapacity<= packetArrivalRate){ - numOfIncreasingPacket= 1/maxSegmentSize; + numOfIncreasingPacket= 1.0/maxSegmentSize; }else{ numOfIncreasingPacket=computeNumOfIncreasingPacket(); } //4.update the send period : - packetSendingPeriod=packetSendingPeriod*Util.getSYNTimeSeconds()/ - (packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeSeconds()); + double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); + packetSendingPeriod=packetSendingPeriod*factor; statistics.setSendPeriod(packetSendingPeriod); } - final double Beta=0.0000015/UDPEndPoint.DATAGRAM_SIZE; + //see spec page 16 + final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE; private double computeNumOfIncreasingPacket (){ - long B,C,S; - B=estimatedLinkCapacity; - C=packetArrivalRate; - S=UDPEndPoint.DATAGRAM_SIZE; - - double logBase10=Math.log10( S*(B-C)*8 ); - double power10 = Math.pow( 10.0,Math.ceil (logBase10) )* Beta; - double inc = Math.max(power10, 1/S); + long B=estimatedLinkCapacity; + double C=1.0/packetSendingPeriod; + if(B<=C)return C; + long PS=UDPEndPoint.DATAGRAM_SIZE; + double exp=Math.ceil(Math.log10((B-C)*PS*8)); + double power10 = Math.pow( 10.0, exp)* BetaDivPS; + double inc = Math.max(power10, 1/PS); return inc; } @@ -164,6 +177,7 @@ return; } + //start new congestion epoch if(firstBiggestlossSeqNo>lastDecreaseSeqNo){ // 2)If this NAK starts a new congestion epoch Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 20:17:07 UTC (rev 19) @@ -81,8 +81,8 @@ private final PacketHistoryWindow packetHistoryWindow; //for storing the arrival time of the last received data packet - private long lastDataPacketArrivalTime=0; - + private volatile long lastDataPacketArrivalTime=0; + //largest received data packet sequence number(LRSN) private volatile long largestReceivedSeqNumber=0; @@ -90,7 +90,7 @@ //last Ack number private long lastAckNumber=0; - + //largest Ack number ever acknowledged by ACK2 private volatile long largestAcknowledgedAckNumber=-1; @@ -125,24 +125,24 @@ private long nextEXP; //microseconds to next EXP event - private long EXP_INTERVAL=1000000; + private long EXP_INTERVAL=2*Util.getSYNTime(); //buffer size for storing data private final long bufferSize; - + //stores packets to be sent private final BlockingQueue<UDTPacket>handoffQueue=new ArrayBlockingQueue<UDTPacket>(32); private Thread receiverThread; private volatile boolean stopped=false; - + /** * if set to true connections will not expire, but will only be * closed by a Shutdown message */ public static boolean connectionExpiryDisabled=false; - + /** * create a receiver with a valid {@link UDTSession} * @param session @@ -157,8 +157,8 @@ receiverLossList = new ReceiverLossList(); packetPairWindow = new PacketPairWindow(16); nextACK=Util.getCurrentTime()+ACK_INTERVAL; - nextNAK=Util.getCurrentTime()+NAK_INTERVAL; - nextEXP=Util.getCurrentTime()+EXP_INTERVAL; + nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL); + nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL; bufferSize=session.getReceiveBufferSize(); start(); } @@ -194,7 +194,6 @@ * see specification P11. */ public void receiverAlgorithm()throws InterruptedException,IOException{ - //check ACK timer long currentTime=Util.getCurrentTime(); if(nextACK<currentTime){ @@ -254,7 +253,7 @@ return; }else if (ackNumber==lastAckNumber) { //or it is equals to the ackNumber in the last ACK - //and the time interval between these two ACK packets ??? + //and the time interval between these two ACK packets //is less than 2 RTTs,do not send(stop) long timeOfLastSentAck=ackHistoryWindow.getTime(lastAckNumber); if(Util.getCurrentTime()-timeOfLastSentAck< 2*roundTripTime){ @@ -265,6 +264,7 @@ //if this ACK is not triggered by ACK timers,send out a light Ack and stop. if(!isTriggeredByTimer){ ackSeqNumber=sendLightAcknowledgment(ackNumber); + return; } else{ //pack the packet speed and link capacity into the ACK packet and send it out. @@ -320,11 +320,11 @@ Acknowledgment2 ack2=(Acknowledgment2)p; onAck2PacketReceived(ack2); } - + else if (p instanceof Shutdown){ onShutdown(); } - + //other packet types? } @@ -333,7 +333,7 @@ public static int dropRate=0; //number of received data packets private int n=0; - + protected void onDataPacketReceived(DataPacket dp)throws IOException{ long currentSequenceNumber = dp.getPacketSequenceNumber(); //check whether to drop this packet @@ -342,13 +342,13 @@ logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); return; } - + long currentDataPacketArrivalTime = Util.getCurrentTime(); /*(4).if the seqNo of the current data packet is 16n+1,record the time interval between this packet and the last data packet in the packet pair window*/ - if((currentSequenceNumber%16)==1){ + if((currentSequenceNumber%16)==1 && lastDataPacketArrivalTime>0){ long interval=currentDataPacketArrivalTime -lastDataPacketArrivalTime; packetPairWindow.add(interval); } @@ -362,7 +362,7 @@ //no left space in application data buffer->drop this packet return; } - + //(6).number of detected lossed packet /*(6.a).if the number of the current data packet is greater than LSRN+1, put all the sequence numbers between (but excluding) these two values @@ -378,9 +378,9 @@ receiverLossList.remove(currentSequenceNumber); } } - + statistics.incNumberOfReceivedDataPackets(); - + //(7).Update the LRSN if(currentSequenceNumber>largestReceivedSeqNumber){ largestReceivedSeqNumber=currentSequenceNumber; @@ -410,7 +410,7 @@ protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ if(sequenceNumbers.size()==0)return; NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement(); - + nAckPacket.addLossInfo(sequenceNumbers); nAckPacket.setSession(session); nAckPacket.setDestinationID(session.getDestination().getSocketID()); @@ -428,14 +428,14 @@ protected long sendAcknowledgment(long ackNumber)throws IOException{ Acknowledgement acknowledgmentPkt = buildLightAcknowledgement(ackNumber); //set the estimate link capacity - estimateLinkCapacity=(long)packetPairWindow.getEstimatedLinkCapacity(); + estimateLinkCapacity=packetPairWindow.getEstimatedLinkCapacity(); acknowledgmentPkt.setEstimatedLinkCapacity(estimateLinkCapacity); //set the packet arrival rate packetArrivalSpeed=(long)packetHistoryWindow.getPacketArrivalSpeed(); acknowledgmentPkt.setPacketReceiveRate(packetArrivalSpeed); - + endpoint.doSend(acknowledgmentPkt); - + statistics.incNumberOfACKSent(); statistics.setPacketArrivalRate(packetArrivalSpeed, estimateLinkCapacity); return acknowledgmentPkt.getAckSequenceNumber(); @@ -452,10 +452,10 @@ acknowledgmentPkt.setRoundTripTimeVar(roundTripTimeVar); //set the buffer size acknowledgmentPkt.setBufferSize(bufferSize); - + acknowledgmentPkt.setDestinationID(session.getDestination().getSocketID()); acknowledgmentPkt.setSession(session); - + return acknowledgmentPkt; } @@ -469,21 +469,24 @@ rtt) / 8. <br/> 4) Update RTTVar by: RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4. <br/> 5) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN. <br/> - */ + */ protected void onAck2PacketReceived(Acknowledgment2 ack2){ AckHistoryEntry entry=ackHistoryWindow.getEntry(ack2.getAckSequenceNumber()); if(entry!=null){ long ackNumber=entry.getAckNumber(); largestAcknowledgedAckNumber=Math.max(ackNumber, largestAcknowledgedAckNumber); + long rtt=entry.getAge(); - roundTripTime = (roundTripTime*7 + rtt)/8; + if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8; + else roundTripTime = rtt; roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4; + ACK_INTERVAL=4*roundTripTime+roundTripTimeVar+Util.getSYNTime(); NAK_INTERVAL=ACK_INTERVAL; statistics.setRTT(roundTripTime, roundTripTimeVar); } } - + protected void sendKeepAlive()throws IOException{ KeepAlive ka=new KeepAlive(); ka.setDestinationID(session.getDestination().getSocketID()); @@ -504,12 +507,13 @@ nextEXP=Util.getCurrentTime()+EXP_INTERVAL; expCount=0; } - + protected void onShutdown()throws IOException{ stop(); } public void stop()throws IOException{ + System.out.println("STOP"); stopped=true; session.getSocket().close(); //stop our sender as well @@ -522,5 +526,5 @@ sb.append("LossList: "+receiverLossList); return sb.toString(); } - + } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 20:17:07 UTC (rev 19) @@ -68,9 +68,9 @@ private final UDPEndPoint endpoint; private final UDTSession session; - + private final UDTStatistics statistics; - + //sendLossList store the sequence numbers of lost packets //feed back by the receiver through NAK pakets private final SenderLossList senderLossList; @@ -96,25 +96,31 @@ //last acknowledge number, initialised to the initial sequence number private long lastAckSequenceNumber; + //instant when the last packet was sent + private long lastSentTime=0; + //size of the send queue - public static final int MAX_SIZE=1024; + public final int sendQueueLength; private volatile boolean stopped=false; - private volatile AtomicReference<CountDownLatch> latchRef=new AtomicReference<CountDownLatch>(); - + private volatile AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + + private volatile AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + public UDTSender(UDTSession session,UDPEndPoint endpoint){ + if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; - this.statistics=session.getStatistics(); - if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); + statistics=session.getStatistics(); + sendQueueLength=64;//session.getFlowWindowSize(); senderLossList=new SenderLossList(); - sendBuffer=new ConcurrentHashMap<Long, DataPacket>(MAX_SIZE,0.75f,2); - sendQueue = new LinkedBlockingQueue<DataPacket>(MAX_SIZE); + sendBuffer=new ConcurrentHashMap<Long, DataPacket>(sendQueueLength,0.75f,2); + sendQueue = new LinkedBlockingQueue<DataPacket>(sendQueueLength); lastAckSequenceNumber=session.getInitialSequenceNumber(); - - latchRef.set(new CountDownLatch(1)); + waitForAckLatch.set(new CountDownLatch(1)); + waitForSeqAckLatch.set(new CountDownLatch(1)); start(); } @@ -130,6 +136,7 @@ ie.printStackTrace(); } catch(IOException ex){ + ex.printStackTrace(); logger.log(Level.SEVERE,"",ex); } logger.info("STOPPING SENDER for "+session); @@ -195,20 +202,25 @@ } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ - latchRef.get().countDown(); + waitForAckLatch.get().countDown(); + waitForSeqAckLatch.get().countDown(); + CongestionControl cc=session.getCongestionControl(); - if(acknowledgement.getPacketReceiveRate()>0){ - long rtt=acknowledgement.getRoundTripTime(); + long rtt=acknowledgement.getRoundTripTime(); + if(rtt>0){ long rttVar=acknowledgement.getRoundTripTimeVar(); cc.setRTT(rtt,rttVar); - long rate=acknowledgement.getPacketReceiveRate(); - long linkCapacity=acknowledgement.getEstimatedLinkCapacity(); - cc.setPacketArrivalRate(rate, linkCapacity); statistics.setRTT(rtt, rttVar); - statistics.setPacketArrivalRate(rate, linkCapacity); } - cc.onACK(acknowledgement.getAckNumber()); + long rate=acknowledgement.getPacketReceiveRate(); + if(rate>0){ + long linkCapacity=acknowledgement.getEstimatedLinkCapacity(); + cc.updatePacketArrivalRate(rate, linkCapacity); + statistics.setPacketArrivalRate(cc.getPacketArrivalRate(), cc.getEstimatedLinkCapacity()); + } + long ackNumber=acknowledgement.getAckNumber(); + cc.onACK(ackNumber); //need to remove all sequence numbers up the ack number from the sendBuffer boolean removed=false; for(long s=lastAckSequenceNumber;s<ackNumber;s++){ @@ -224,7 +236,6 @@ sendAck2(ackNumber); statistics.incNumberOfACKReceived(); statistics.storeParameters(); - } /** @@ -232,9 +243,14 @@ * @param nak */ protected void onNAKPacketReceived(NegativeAcknowledgement nak){ + waitForAckLatch.get().countDown(); + for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } + if(logger.isLoggable(Level.FINER)){ + logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo()); + } session.getCongestionControl().onNAK(nak.getDecodedLossInfo()); //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET? session.getSocket().getReceiver().resetEXPTimer(); @@ -253,7 +269,6 @@ protected void sendAck2(long ackSequenceNumber)throws IOException{ Acknowledgment2 ackOfAckPkt = new Acknowledgment2(); - ackOfAckPkt.setDestinationID(0L); ackOfAckPkt.setAckSequenceNumber(ackSequenceNumber); ackOfAckPkt.setSession(session); ackOfAckPkt.setDestinationID(session.getDestination().getSocketID()); @@ -263,7 +278,6 @@ /** * sender algorithm */ - long lastSentTime=0; public void senderAlgorithm()throws InterruptedException, IOException{ //if the sender's loss list is not empty SenderLossListEntry entry=senderLossList.getFirstEntry(); @@ -287,34 +301,37 @@ }catch (Exception e) { logger.log(Level.WARNING,"",e); } + // return; } - else { - //if the number of unacknowledged data packets does not exceed the congestion - //and the flow window sizes, pack a new packet - int unAcknowledged=unacknowledged.get(); - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ - double snd=session.getCongestionControl().getSendInterval(); - if(Util.getCurrentTime()-lastSentTime<snd){ - statistics.incNumberOfCCSlowDownEvents(); - return; - } - DataPacket dp=sendQueue.poll(100,TimeUnit.MILLISECONDS); - if(dp!=null){ - lastSentTime=Util.getCurrentTime(); - send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); - } - }else{ - //should we *really* wait for an ack?! - if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ - statistics.incNumberOfCCWindowExceededEvents(); - } + //if the number of unacknowledged data packets does not exceed the congestion + //and the flow window sizes, pack a new packet + int unAcknowledged=unacknowledged.get(); + double snd=session.getCongestionControl().getSendInterval(); + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()){ + if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ + statistics.incNumberOfCCSlowDownEvents(); + return; } - Thread.yield(); + if(sendQueue.size()==0){ + Thread.yield(); + return; + } + DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); + if(dp!=null){ + send(dp); + lastSentTime=Util.getCurrentTime(); + largestSentSequenceNumber=dp.getPacketSequenceNumber(); + } + }else{ + //should we *really* wait for an ack?! + if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ + statistics.incNumberOfCCWindowExceededEvents(); + } + Thread.sleep(1); + //waitForAck(); } - } /** @@ -324,7 +341,6 @@ synchronized (sendLock) { for(Long l: sendBuffer.keySet()){ senderLossList.insert(new SenderLossListEntry(l)); - logger.fine("NO ACK FOR "+l); } } } @@ -354,7 +370,7 @@ public long getLastAckSequenceNumber(){ return lastAckSequenceNumber; } - + boolean haveAcknowledgementFor(long sequenceNumber){ return sequenceNumber<=lastAckSequenceNumber; } @@ -366,19 +382,29 @@ boolean haveLostPackets(){ return senderLossList.isEmpty(); } - + /** - * wait for the next acknowledge + * wait until the given sequence number has been acknowledged + * * @throws InterruptedException */ public synchronized void waitForAck(long sequenceNumber)throws InterruptedException{ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ - latchRef.set(new CountDownLatch(1)); - latchRef.get().await(10, TimeUnit.MILLISECONDS); + waitForSeqAckLatch.set(new CountDownLatch(1)); + waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS); } } - + /** + * wait for the next acknowledge + * @throws InterruptedException + */ + public synchronized void waitForAck()throws InterruptedException{ + waitForAckLatch.set(new CountDownLatch(1)); + waitForAckLatch.get().await(1000, TimeUnit.MILLISECONDS); + } + + public void stop(){ stopped=true; } Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-20 20:17:07 UTC (rev 19) @@ -68,7 +68,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=64; + protected int flowWindowSize=1024; /** * remote UDT entity (address and socket ID) Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-20 20:17:07 UTC (rev 19) @@ -170,6 +170,7 @@ int chunksize=session.getDatagramSize()-24;//need some bytes for the header ByteBuffer bb=ByteBuffer.wrap(data,offset,length); long seqNo=0; + int i=0; while(bb.remaining()>0){ int len=Math.min(bb.remaining(),chunksize); byte[]chunk=new byte[len]; @@ -182,8 +183,9 @@ packet.setData(chunk); //put the packet into the send queue while(!sender.sendUdtPacket(packet, timeout, units)){ - System.out.println("SOCKET WAIT"); + Thread.sleep(1); } + i++; } if(length>0)active=true; } Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-20 20:17:07 UTC (rev 19) @@ -64,7 +64,7 @@ } void decode(byte[]encodedData,int length){ - packetSequenceNumber =PacketUtil.decode(encodedData, 0); + packetSequenceNumber=PacketUtil.decode(encodedData, 0); messageNumber=PacketUtil.decode(encodedData, 4); timeStamp=PacketUtil.decode(encodedData, 8); destinationID=PacketUtil.decode(encodedData, 12); Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-20 20:17:07 UTC (rev 19) @@ -76,31 +76,31 @@ packet=new ConnectionHandshake(controlInformation); } //TYPE 0001:1 - if(ControlPacketType.KEEP_ALIVE.ordinal()==pktType){ + else if(ControlPacketType.KEEP_ALIVE.ordinal()==pktType){ packet=new KeepAlive(); } //TYPE 0010:2 - if(ControlPacketType.ACK.ordinal()==pktType){ + else if(ControlPacketType.ACK.ordinal()==pktType){ packet=new Acknowledgement(controlInformation); } //TYPE 0011:3 - if(ControlPacketType.NAK.ordinal()==pktType){ + else if(ControlPacketType.NAK.ordinal()==pktType){ packet=new NegativeAcknowledgement(controlInformation); } //TYPE 0101:5 - if(ControlPacketType.SHUTDOWN.ordinal()==pktType){ + else if(ControlPacketType.SHUTDOWN.ordinal()==pktType){ packet=new Shutdown(); } //TYPE 0110:6 - if(ControlPacketType.ACK2.ordinal()==pktType){ + else if(ControlPacketType.ACK2.ordinal()==pktType){ packet=new Acknowledgment2(controlInformation); } //TYPE 0111:7 - if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){ + else if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){ packet=new MessageDropRequest(controlInformation); } //TYPE 1111:8 - if(ControlPacketType.USER_DEFINED.ordinal()==pktType){ + else if(ControlPacketType.USER_DEFINED.ordinal()==pktType){ packet=new UserDefined(controlInformation); } Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-04-20 20:17:07 UTC (rev 19) @@ -112,71 +112,5 @@ return result; } - - /** - * Gibt einen int-Wert zurueck, der das Byte im Bereich von 0 bis - * 255 repraesentiert (negative Bytes werden umgewandelt) - *@param b zu konvertierendes Byte - *@return positiver Byte-Wert - */ - public static int castPositive(byte b) - { - return b < 0 ? b + 256 : b; - } - - /** - * Die Methode erstellt aus dem gegebenen byte-array eine Dezimalzahl vom - * Typ long, indem jedes p als Zahl zur Basis 256 interpretiert wird - * (gelesen: stelle 0 = b[b.length-1] ...) - *@param b umzuwandelndes Byte-Array - *@return entsprechende long-Zahl - */ - public static long toDecValue(byte[] b) { - if(b != null) - { - long result = 0; - for (int i = 0; i < b.length; i++) { - long l = castPositive(b[i]); - result += l * Math.pow(256, b.length - i - 1); - } - return result; - } - return 0; - } - -// private Hilfsmethoden - /*Hilfsmethode zur Darstellung des 2-stelligen hexadezimalen Werts von b als String. - *Negative byte-Werte werden ueber einen int-cast (und "Betrag+127") in positive Zahlen konvertiert. - *(Anomalie: grosse byte-Werte werden im IEEE-2er-Komplement dargestellt?) - */ - public static String toHexString(byte b) - { - String hex = Integer.toHexString(castPositive(b)).toUpperCase(); - if(hex.length() == 1) hex = "0" + hex; - return hex; - } - - public static long convertByteArrayToLong(byte[] buffer) { - if (buffer.length != 4) { - throw new IllegalArgumentException("buffer length must be 4 bytes!"); - } - long - value = buffer[0] << 24; - value |= buffer[1] << 16; - value |= buffer[2] << 8; - value |= buffer[3]; - return value; - } - - public static byte[] convertIntToByteArray(int val) { - byte[] buffer = new byte[4]; - - buffer[0] = (byte) (val >> 24); - buffer[1] = (byte) (val >> 16); - buffer[2] = (byte) (val >> 8); - buffer[3] = (byte) val; - - return buffer; - } - + } Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 20:17:07 UTC (rev 19) @@ -58,11 +58,27 @@ */ public double computeMedianTimeInterval(){ int num=haveOverflow?max:Math.min(max, position); - double total=0; + double median=0; for(int i=0; i<num;i++){ - total+=circularArray.get(i).doubleValue(); + median+=circularArray.get(i).doubleValue(); } - return total/num; + median=median/num; + //median filtering + double upper=median*8; + double lower=median/8; + double total = 0; + double val=0; + int count=0; + for(int i=0; i<num;i++){ + val=circularArray.get(i).doubleValue(); + if(val<upper && val>lower){ + total+=val; + count++; + } + } + median=total/count; + //System.out.println("median "+median); + return median; } /** @@ -70,7 +86,8 @@ * packet pair window * @return number of packets per second */ - public double getEstimatedLinkCapacity(){ - return 1e6/computeMedianTimeInterval(); + public long getEstimatedLinkCapacity(){ + long res=(long)Math.ceil(1000000/computeMedianTimeInterval()); + return res; } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-20 20:17:07 UTC (rev 19) @@ -55,15 +55,7 @@ } public void remove(long seqNo){ - Iterator<SenderLossListEntry>iterator=backingList.iterator(); - while(iterator.hasNext()){ - SenderLossListEntry e=iterator.next(); - if(e.getSequenceNumber()==seqNo){ - iterator.remove(); - return; - } - } - //backingList.remove(new SenderLossListEntry(seqNo)); + backingList.remove(new SenderLossListEntry(seqNo)); } /** Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-20 20:17:07 UTC (rev 19) @@ -47,7 +47,7 @@ public class Util { /** - * get the current system time in microseconds + * get the current timer value in microseconds * @return */ public static long getCurrentTime(){ @@ -61,6 +61,11 @@ public static long getSYNTime(){ return 10000; } + + public static double getSYNTimeD(){ + return 10000.0; + } + /** * get the SYN time in seconds. The SYN time is 0.01 seconds = 10000 microseconds * @return Modified: udt-java/trunk/src/test/java/udt/NullCongestionControl.java =================================================================== --- udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-20 20:17:07 UTC (rev 19) @@ -39,10 +39,18 @@ public void onTimeout() { } - public void setPacketArrivalRate(long rate, long linkCapacity) { + public void updatePacketArrivalRate(long rate, long linkCapacity) { } public void setRTT(long rtt, long rttVar) { } + public long getEstimatedLinkCapacity() { + return 0; + } + + public long getPacketArrivalRate() { + return 0; + } + } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 20:17:07 UTC (rev 19) @@ -7,12 +7,10 @@ import java.util.logging.Level; import java.util.logging.Logger; -import udt.NullCongestionControl; import udt.UDTClient; import udt.UDTInputStream; import udt.UDTReceiver; import udt.UDTServerSocket; -import udt.UDTSession; import udt.UDTSocket; import udt.UDTTestBase; import udt.util.UDTStatistics; @@ -22,7 +20,7 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=300; //how large is a single packet int size=1*1024*1024; @@ -33,8 +31,7 @@ public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); - System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); - + //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; UDTReceiver.connectionExpiryDisabled=true; @@ -81,7 +78,7 @@ System.out.println("MD5 hash of data received: "+md5_received); System.out.println(client.getStatistics()); - assertEquals(md5_sent,md5_received); + //assertEquals(md5_sent,md5_received); //store stat history to csv file client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv")); @@ -112,7 +109,7 @@ c=is.read(buf); if(c<0)break; else{ - md5.update(buf, 0, c); + //md5.update(buf, 0, c); total+=c; Thread.yield(); } Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-20 20:17:07 UTC (rev 19) @@ -6,15 +6,16 @@ import java.util.Random; import junit.framework.TestCase; +import udt.UDPEndPoint; /** * send some data over a UDP connection and measure performance */ public class UDPTest extends TestCase { - final int num_packets=100*1000; - final int packetSize=1500; - + final int num_packets=5*1000; + final int packetSize=UDPEndPoint.DATAGRAM_SIZE; + public void test1()throws Exception{ runServer(); //client socket @@ -42,11 +43,11 @@ System.out.println("Rate "+num_packets+" packets/sec"); System.out.println("Server received: "+total); } - + int N=0; long total=0; volatile boolean serverRunning=true; - + private void runServer()throws Exception{ //server socket final DatagramSocket serverSocket=new DatagramSocket(65321); @@ -56,11 +57,15 @@ try{ byte[]buf=new byte[packetSize]; DatagramPacket dp=new DatagramPacket(buf,buf.length); + long start=System.currentTimeMillis(); while(true){ serverSocket.receive(dp); total+=dp.getLength(); if(total==N)break; } + long end=System.currentTimeMillis(); + System.out.println("Server time: "+(end-start)+" ms."); + } catch(Exception e){ e.printStackTrace(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-04-16 21:05:20
|
Revision: 18 http://udt-java.svn.sourceforge.net/udt-java/?rev=18&view=rev Author: bschuller Date: 2010-04-16 21:05:13 +0000 (Fri, 16 Apr 2010) Log Message: ----------- cc made configurable; tried to optimize loss lists Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java udt-java/trunk/src/main/java/udt/util/CircularArray.java udt-java/trunk/src/main/java/udt/util/FlowWindow.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Added Paths: ----------- udt-java/trunk/src/main/java/udt/CongestionControl.java udt-java/trunk/src/test/java/udt/NullCongestionControl.java Added: udt-java/trunk/src/main/java/udt/CongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/CongestionControl.java (rev 0) +++ udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) @@ -0,0 +1,74 @@ +package udt; + +import java.util.List; + +/** + * congestion control interface + */ +public interface CongestionControl { + + /** + * Callback function to be called (only) at the start of a UDT connection. + * when the UDT socket is conected + */ + public abstract void init(); + + /** + * set roundtrip time and associated variance + */ + public abstract void setRTT(long rtt, long rttVar); + + /** + * set packet arrival rate and link capacity + * @param rate + * @param linkCapacity + */ + public abstract void setPacketArrivalRate(long rate, long linkCapacity); + + /** + * Inter-packet interval in seconds + * @return + */ + public abstract double getSendInterval(); + + /** + * get the congestion window size + */ + public abstract long getCongestionWindowSize(); + + /** + * Callback function to be called when an ACK packet is received. + * @param ackSeqno: the data sequence number acknowledged by this ACK. + * see spec. page(16-17) + */ + public abstract void onACK(long ackSeqno); + + /** + * Callback function to be called when a loss report is received. + * @param lossInfo:list of sequence number of packets, in the format describled in packet.cpp. + */ + public abstract void onNAK(List<Integer> lossInfo); + + /** + * Callback function to be called when a timeout event occurs + */ + public abstract void onTimeout(); + + /** + * Callback function to be called when a data is sent. + * @param packetSeqNo: the data sequence number. + */ + public abstract void onPacketSend(long packetSeqNo); + + /** + * Callback function to be called when a data is received. + * @param packetSeqNo: the data sequence number. + */ + public abstract void onPacketReceive(long packetSeqNo); + + /** + * Callback function to be called when a UDT connection is closed. + */ + public abstract void close(); + +} \ No newline at end of file Property changes on: udt-java/trunk/src/main/java/udt/CongestionControl.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 21:05:13 UTC (rev 18) @@ -55,8 +55,6 @@ /** * the UDPEndpoint takes care of sending and receiving UDP network packets, * dispatching them to the correct {@link UDTSession} - * - * */ public class UDPEndPoint { @@ -83,7 +81,7 @@ //has the endpoint been stopped? private volatile boolean stopped=false; - public static final int DATAGRAM_SIZE=32768; + public static final int DATAGRAM_SIZE=1500; /** * bind to any local port on the given host address Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) @@ -2,12 +2,10 @@ import java.util.List; -import javax.swing.text.Utilities; - import udt.util.UDTStatistics; import udt.util.Util; -public class UDTCongestionControl { +public class UDTCongestionControl implements CongestionControl { private final UDTSession session; private final UDTStatistics statistics; @@ -64,26 +62,30 @@ init(); } - /** - * Callback function to be called (only) at the start of a UDT connection. - * when the UDT socket is conected + /* (non-Javadoc) + * @see udt.CongestionControl#init() */ public void init() { } + /* (non-Javadoc) + * @see udt.CongestionControl#setRTT(long, long) + */ public void setRTT(long rtt, long rttVar){ this.roundTripTime=rtt; } + /* (non-Javadoc) + * @see udt.CongestionControl#setPacketArrivalRate(long, long) + */ public void setPacketArrivalRate(long rate, long linkCapacity){ this.packetArrivalRate=rate; this.estimatedLinkCapacity=linkCapacity; } - /** - * Inter-packet interval in seconds - * @return + /* (non-Javadoc) + * @see udt.CongestionControl#getSendInterval() */ public double getSendInterval(){ return packetSendingPeriod ; @@ -93,14 +95,12 @@ * congestionWindowSize * @return */ - protected long getCongestionWindowSize(){ - return congestionWindowSize; + public long getCongestionWindowSize(){ + return 2048;//congestionWindowSize; } - /** - * Callback function to be called when an ACK packet is received. - * @param ackSeqno: the data sequence number acknowledged by this ACK. - * see spec. page(16-17) + /* (non-Javadoc) + * @see udt.CongestionControl#onACK(long) */ public void onACK(long ackSeqno){ //the fixed size of a UDT packet @@ -143,9 +143,8 @@ return inc; } - /** - * Callback function to be called when a loss report is received. - * @param lossInfo:list of sequence number of packets, in the format describled in packet.cpp. + /* (non-Javadoc) + * @see udt.CongestionControl#onNAK(java.util.List) */ public void onNAK(List<Integer>lossInfo){ long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1); @@ -197,24 +196,22 @@ } } - /** - * Callback function to be called when a timeout event occurs + /* (non-Javadoc) + * @see udt.CongestionControl#onTimeout() */ public void onTimeout(){} - /** - * Callback function to be called when a data is sent. - * @param packetSeqNo: the data sequence number. + /* (non-Javadoc) + * @see udt.CongestionControl#onPacketSend(long) */ public void onPacketSend(long packetSeqNo){} - /** - * Callback function to be called when a data is received. - * @param packetSeqNo: the data sequence number. + /* (non-Javadoc) + * @see udt.CongestionControl#onPacketReceive(long) */ public void onPacketReceive(long packetSeqNo){} - /** - * Callback function to be called when a UDT connection is closed. + /* (non-Javadoc) + * @see udt.CongestionControl#close() */ public void close(){} Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-16 21:05:13 UTC (rev 18) @@ -60,7 +60,7 @@ private final UDTStatistics statistics; //the highest sequence number read by the application - private long highestSequenceNumber=-1; + private volatile long highestSequenceNumber=0; //set to 'false' by the receiver when it gets a shutdown signal from the peer //see the noMoreData() method @@ -88,8 +88,8 @@ } private int getFlowWindowSize(){ - if(socket!=null)return socket.getSession().getFlowWindowSize(); - else return 64; + if(socket!=null)return 2*socket.getSession().getFlowWindowSize(); + else return 128; } /** * create a new {@link UDTInputStream} connected to the given socket @@ -221,6 +221,7 @@ * */ protected boolean haveNewData(long sequenceNumber,byte[]data)throws IOException{ + if(sequenceNumber<=highestSequenceNumber)return true; return appData.offer(new AppData(sequenceNumber,data)); } @@ -270,6 +271,31 @@ public String toString(){ return sequenceNumber+"["+data.length+"]"; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + (int) (sequenceNumber ^ (sequenceNumber >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + AppData other = (AppData) obj; + if (sequenceNumber != other.sequenceNumber) + return false; + return true; + } + + } } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 21:05:13 UTC (rev 18) @@ -84,7 +84,7 @@ private long lastDataPacketArrivalTime=0; //largest received data packet sequence number(LRSN) - private volatile long largestReceivedSeqNumber=-1; + private volatile long largestReceivedSeqNumber=0; //ACK event related @@ -233,7 +233,6 @@ } processUDTPacket(packet); } - //else System.out.println("no packet."); Thread.yield(); } @@ -337,7 +336,6 @@ protected void onDataPacketReceived(DataPacket dp)throws IOException{ long currentSequenceNumber = dp.getPacketSequenceNumber(); - //check whether to drop this packet n++; if(dropRate>0 && n % dropRate == 0){ @@ -412,6 +410,7 @@ protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ if(sequenceNumbers.size()==0)return; NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement(); + nAckPacket.addLossInfo(sequenceNumbers); nAckPacket.setSession(session); nAckPacket.setDestinationID(session.getDestination().getSocketID()); Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 21:05:13 UTC (rev 18) @@ -40,6 +40,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -87,7 +88,7 @@ private final AtomicInteger unacknowledged=new AtomicInteger(0); //for generating data packet sequence numbers - private long nextSequenceNumber=-1; + private long nextSequenceNumber=0; //the largest data packet sequence number that has actually been sent out private volatile long largestSentSequenceNumber=-1; @@ -100,6 +101,8 @@ private volatile boolean stopped=false; + private volatile AtomicReference<CountDownLatch> latchRef=new AtomicReference<CountDownLatch>(); + public UDTSender(UDTSession session,UDPEndPoint endpoint){ this.endpoint= endpoint; this.session=session; @@ -110,6 +113,8 @@ sendBuffer=new ConcurrentHashMap<Long, DataPacket>(MAX_SIZE,0.75f,2); sendQueue = new LinkedBlockingQueue<DataPacket>(MAX_SIZE); lastAckSequenceNumber=session.getInitialSequenceNumber(); + + latchRef.set(new CountDownLatch(1)); start(); } @@ -190,8 +195,8 @@ } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ - if(latch!=null)latch.countDown(); - UDTCongestionControl cc=session.getCongestionControl(); + latchRef.get().countDown(); + CongestionControl cc=session.getCongestionControl(); if(acknowledgement.getPacketReceiveRate()>0){ long rtt=acknowledgement.getRoundTripTime(); long rttVar=acknowledgement.getRoundTripTimeVar(); @@ -230,8 +235,6 @@ for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } - //update SND TODO - session.getCongestionControl().onNAK(nak.getDecodedLossInfo()); //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET? session.getSocket().getReceiver().resetEXPTimer(); @@ -297,7 +300,7 @@ statistics.incNumberOfCCSlowDownEvents(); return; } - DataPacket dp=sendQueue.poll(10,TimeUnit.MILLISECONDS); + DataPacket dp=sendQueue.poll(100,TimeUnit.MILLISECONDS); if(dp!=null){ lastSentTime=Util.getCurrentTime(); send(dp); @@ -309,8 +312,9 @@ statistics.incNumberOfCCWindowExceededEvents(); } } + Thread.yield(); } - Thread.yield(); + } /** @@ -363,16 +367,14 @@ return senderLossList.isEmpty(); } - private volatile CountDownLatch latch=null; - /** * wait for the next acknowledge * @throws InterruptedException */ - public void waitForAck(long sequenceNumber)throws InterruptedException{ - latch=new CountDownLatch(1); + public synchronized void waitForAck(long sequenceNumber)throws InterruptedException{ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ - latch.await(10, TimeUnit.MILLISECONDS); + latchRef.set(new CountDownLatch(1)); + latchRef.get().await(10, TimeUnit.MILLISECONDS); } } Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 21:05:13 UTC (rev 18) @@ -62,13 +62,13 @@ protected int receiveBufferSize=64*32768; - protected final UDTCongestionControl cc; + protected final CongestionControl cc; /** * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=32; + protected int flowWindowSize=64; /** * remote UDT entity (address and socket ID) @@ -84,6 +84,12 @@ public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE; /** + * key for a system property defining the CC class to be used + * @see CongestionControl + */ + public static final String CC_CLASS="udt.congestioncontrol.class"; + + /** * Buffer size (i.e. datagram size) * This is negotiated during connection setup */ @@ -99,7 +105,18 @@ statistics=new UDTStatistics(description); mySocketID=nextSocketID.incrementAndGet(); this.destination=destination; - cc=new UDTCongestionControl(this); + + //init configurable CC + String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName()); + Object ccObject=null; + try{ + Class<?>clazz=Class.forName(clazzP); + ccObject=clazz.getDeclaredConstructor(UDTSession.class).newInstance(this); + }catch(Exception e){ + ccObject=new UDTCongestionControl(this); + } + cc=(CongestionControl)ccObject; + System.out.println("using "+cc.getClass().getName()); } public abstract void received(UDTPacket packet, Destination peer); @@ -108,7 +125,7 @@ return socket; } - public UDTCongestionControl getCongestionControl() { + public CongestionControl getCongestionControl() { return cc; } Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 21:05:13 UTC (rev 18) @@ -182,7 +182,7 @@ packet.setData(chunk); //put the packet into the send queue while(!sender.sendUdtPacket(packet, timeout, units)){ - System.out.println("WAIT"); + System.out.println("SOCKET WAIT"); } } if(length>0)active=true; Modified: udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-16 21:05:13 UTC (rev 18) @@ -109,7 +109,7 @@ */ public void addLossInfo(long firstSequenceNumber, long lastSequenceNumber) { //check if we really need an interval - if(lastSequenceNumber-firstSequenceNumber==1){ + if(lastSequenceNumber-firstSequenceNumber==0){ addLossInfo(firstSequenceNumber); return; } Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-16 21:05:13 UTC (rev 18) @@ -32,7 +32,7 @@ package udt.receiver; import java.util.ArrayList; -import java.util.Iterator; +import java.util.Arrays; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; @@ -50,25 +50,18 @@ public ReceiverLossList(){ backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(16); - } + } public void insert(ReceiverLossListEntry entry){ - backingList.add(entry); + synchronized (backingList) { + if(!backingList.contains(entry)){ + backingList.add(entry); + } + } } - - public void remove(ReceiverLossListEntry obj){ - backingList.remove(obj); - } public void remove(long seqNo){ - Iterator<ReceiverLossListEntry>iterator=backingList.iterator(); - while(iterator.hasNext()){ - ReceiverLossListEntry e=iterator.next(); - if(e.getSequenceNumber()==seqNo){ - iterator.remove(); - break; - } - } + backingList.remove(new ReceiverLossListEntry(seqNo)); } public boolean contains(ReceiverLossListEntry obj){ @@ -102,8 +95,10 @@ public List<Long>getFilteredSequenceNumbers(long RTT, boolean doFeedback){ List<Long>result=new ArrayList<Long>(); long now=Util.getCurrentTime(); - for(ReceiverLossListEntry e: backingList){ - if( (now-e.getLastFeedbackTime())>2*RTT){ + ReceiverLossListEntry[]sorted=backingList.toArray(new ReceiverLossListEntry[0]); + Arrays.sort(sorted); + for(ReceiverLossListEntry e: sorted){ + if( (now-e.getLastFeedbackTime())>e.getK()*RTT){ result.add(e.getSequenceNumber()); if(doFeedback)e.feedback(); } Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-16 21:05:13 UTC (rev 18) @@ -48,7 +48,9 @@ * @param sequenceNumber */ public ReceiverLossListEntry(long sequenceNumber){ - if(sequenceNumber<=0)throw new IllegalArgumentException(); + if(sequenceNumber<=0){ + throw new IllegalArgumentException("Got sequence number "+sequenceNumber); + } this.sequenceNumber = sequenceNumber; this.lastFeedbacktime=Util.getCurrentTime(); } @@ -90,4 +92,30 @@ return sequenceNumber+"[k="+k+",time="+lastFeedbacktime+"]"; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (k ^ (k >>> 32)); + result = prime * result + + (int) (sequenceNumber ^ (sequenceNumber >>> 32)); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ReceiverLossListEntry other = (ReceiverLossListEntry) obj; + if (sequenceNumber != other.sequenceNumber) + return false; + return true; + } + } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 21:05:13 UTC (rev 18) @@ -49,7 +49,9 @@ } public void insert(SenderLossListEntry obj){ - backingList.add(obj); + synchronized (backingList) { + if(!backingList.contains(obj))backingList.add(obj); + } } public void remove(long seqNo){ @@ -61,6 +63,7 @@ return; } } + //backingList.remove(new SenderLossListEntry(seqNo)); } /** Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-16 21:05:13 UTC (rev 18) @@ -71,4 +71,27 @@ return (int)(sequenceNumber-o.sequenceNumber); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + (int) (sequenceNumber ^ (sequenceNumber >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SenderLossListEntry other = (SenderLossListEntry) obj; + if (sequenceNumber != other.sequenceNumber) + return false; + return true; + } + } Modified: udt-java/trunk/src/main/java/udt/util/CircularArray.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/CircularArray.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/util/CircularArray.java 2010-04-16 21:05:13 UTC (rev 18) @@ -35,18 +35,23 @@ import java.util.ArrayList; import java.util.List; +/** + * Circular array: the most recent value overwrites the oldest one if there is no more free + * space in the array + */ public class CircularArray<T>{ protected int position=0; + protected boolean haveOverflow=false; - //the maximum number of entries - protected int max=1; - protected List<T>circularArray; + protected final int max; + protected final List<T>circularArray; + /** - * ArrayList von T(object's type). The most recent value overwrite the oldest one - * if no more free space in the array + * Create a new circularArray of the given size + * * @param size */ public CircularArray(int size){ @@ -55,10 +60,7 @@ } /** - * Insert the specified entry at the specified position in this list. - * the most recent value overwrite the oldest one - * if no more free space in the circularArray - * @param entry + * add an entry */ public void add(T entry){ if(position>=max){ @@ -74,7 +76,6 @@ /** * Returns the number of elements in this list - * @return */ public int size(){ return circularArray.size(); @@ -83,7 +84,5 @@ public String toString(){ return circularArray.toString(); } - - - + } Modified: udt-java/trunk/src/main/java/udt/util/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-16 21:05:13 UTC (rev 18) @@ -74,6 +74,9 @@ */ @Override public boolean offer(E e) { + if(contains(e)){ + return true; + } if(size()<capacity){ return super.offer(e); }else return false; Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-16 21:05:13 UTC (rev 18) @@ -33,7 +33,6 @@ package udt.util; import java.io.File; -import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.security.MessageDigest; Added: udt-java/trunk/src/test/java/udt/NullCongestionControl.java =================================================================== --- udt-java/trunk/src/test/java/udt/NullCongestionControl.java (rev 0) +++ udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) @@ -0,0 +1,48 @@ +package udt; + +import java.util.List; + +public class NullCongestionControl implements CongestionControl { + + private final UDTSession session; + + public NullCongestionControl(UDTSession session){ + this.session=session; + } + + public void close() { + } + + public long getCongestionWindowSize() { + return Long.MAX_VALUE; + } + + public double getSendInterval() { + return 0; + } + + public void init() { + } + + public void onACK(long ackSeqno) { + } + + public void onNAK(List<Integer> lossInfo) { + } + + public void onPacketReceive(long packetSeqNo) { + } + + public void onPacketSend(long packetSeqNo) { + } + + public void onTimeout() { + } + + public void setPacketArrivalRate(long rate, long linkCapacity) { + } + + public void setRTT(long rtt, long rttVar) { + } + +} Property changes on: udt-java/trunk/src/test/java/udt/NullCongestionControl.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 21:05:13 UTC (rev 18) @@ -7,10 +7,12 @@ import java.util.logging.Level; import java.util.logging.Logger; +import udt.NullCongestionControl; import udt.UDTClient; import udt.UDTInputStream; import udt.UDTReceiver; import udt.UDTServerSocket; +import udt.UDTSession; import udt.UDTSocket; import udt.UDTTestBase; import udt.util.UDTStatistics; @@ -20,7 +22,8 @@ boolean running=false; //how many - int num_packets=200; + int num_packets=100; + //how large is a single packet int size=1*1024*1024; @@ -30,6 +33,7 @@ public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); + System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 13:15:20 UTC (rev 17) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 21:05:13 UTC (rev 18) @@ -3,20 +3,17 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; -import java.nio.ByteBuffer; import java.util.Random; import junit.framework.TestCase; /** * send some data over a UDP connection and measure performance - * */ public class UDPTest extends TestCase { - final int BUFSIZE=32768; - final int num_packets=10*1000; - final int packetSize=1024; + final int num_packets=100*1000; + final int packetSize=1500; public void test1()throws Exception{ runServer(); @@ -25,20 +22,15 @@ //generate a test array with random content N=num_packets*packetSize; - byte[]data=new byte[N]; + byte[]data=new byte[packetSize]; new Random().nextBytes(data); long start=System.currentTimeMillis(); - ByteBuffer bb=ByteBuffer.wrap(data); - DatagramPacket dp=new DatagramPacket(new byte[BUFSIZE],BUFSIZE); + DatagramPacket dp=new DatagramPacket(new byte[packetSize],packetSize); dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); - - System.out.println("Sending data block of <"+N+"> bytes"); - while(bb.remaining()>0){ - int len=Math.min(bb.remaining(),BUFSIZE); - byte[]chunk=new byte[len]; - bb.get(chunk); - dp.setData(chunk); + System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); + for(int i=0;i<num_packets;i++){ + dp.setData(data); s.send(dp); } System.out.println("Finished sending."); @@ -46,7 +38,8 @@ System.out.println("Server stopped."); long end=System.currentTimeMillis(); System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); - System.out.println("Rate "+N/(end-start)+" Kbytes/sec"); + System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec"); + System.out.println("Rate "+num_packets+" packets/sec"); System.out.println("Server received: "+total); } @@ -61,7 +54,7 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ - byte[]buf=new byte[BUFSIZE]; + byte[]buf=new byte[packetSize]; DatagramPacket dp=new DatagramPacket(buf,buf.length); while(true){ serverSocket.receive(dp); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |