[Udt-java-commits] SF.net SVN: udt-java:[21] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-21 15:45:06
|
Revision: 21 http://udt-java.svn.sourceforge.net/udt-java/?rev=21&view=rev Author: bschuller Date: 2010-04-21 15:45:00 +0000 (Wed, 21 Apr 2010) Log Message: ----------- rate control stuff --- still not so great IMO Modified Paths: -------------- udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-21 15:45:00 UTC (rev 21) @@ -107,8 +107,6 @@ } active = true; try{ - //packet received means we should not yet expire - socket.getReceiver().resetEXPTimer(); if(packet.forSender()){ socket.getSender().receive(lastPacket); }else{ Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-21 15:45:00 UTC (rev 21) @@ -1,12 +1,16 @@ package udt; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; import udt.util.UDTStatistics; import udt.util.Util; public class UDTCongestionControl implements CongestionControl { + private static final Logger logger=Logger.getLogger(UDTCongestionControl.class.getName()); + private final UDTSession session; private final UDTStatistics statistics; @@ -49,14 +53,16 @@ long nACKCount=1; //number of decreases in a congestion epoch - long congestionEpochDecreaseCount=1; + long decCount=1; //random threshold on decrease by number of loss events - long decreaseRandom; + long decreaseRandom=1; //average number of NAKs per congestion long averageNACKNum; + boolean loss=false; + public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); @@ -115,41 +121,65 @@ * @see udt.CongestionControl#onACK(long) */ public void onACK(long ackSeqno){ - //the fixed size of a UDT packet - long maxSegmentSize=UDPEndPoint.DATAGRAM_SIZE; - - //1.if it is in slow start phase,set the congestion window size - //to the product of packet arrival rate and(rtt +SYN) - double A=packetArrivalRate*(roundTripTime+Util.getSYNTime()); - //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A); + //increase window during slow start if(slowStartPhase){ - congestionWindowSize=16; - slowStartPhase=false; - return; + congestionWindowSize+=ackSeqno-lastAckSeqNumber; + lastAckSeqNumber = ackSeqno; + + //but not beyond a maximum size + if(congestionWindowSize>session.getFlowWindowSize()){ + System.out.println("slow start ends on ACK"); + slowStartPhase=false; + if(packetArrivalRate>0){ + packetSendingPeriod=1000000.0/packetArrivalRate; + } + else{ + packetSendingPeriod=(double)congestionWindowSize/(roundTripTime+Util.getSYNTimeD()); + } + } + }else{ + //1.if it is not in slow start phase,set the congestion window size + //to the product of packet arrival rate and(rtt +SYN) + double A=packetArrivalRate/1000000.0*(roundTripTime+Util.getSYNTimeD()); congestionWindowSize=(long)A+16; + if(logger.isLoggable(Level.FINER)){ + logger.finer("receive rate "+packetArrivalRate+" rtt "+roundTripTime+" set to window size: "+(A+16)); + } } + //no rate increase in slow start + if(slowStartPhase)return; + + if(loss){ + loss=false; + return; + } + //4.compute the number of sent packets to be increase in the next SYN period //and update the send intervall - if(estimatedLinkCapacity<= packetArrivalRate){ - numOfIncreasingPacket= 1.0/maxSegmentSize; - }else{ - numOfIncreasingPacket=computeNumOfIncreasingPacket(); - } + numOfIncreasingPacket=computeNumOfIncreasingPacket(); + //4.update the send period : double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); - packetSendingPeriod=packetSendingPeriod*factor; + packetSendingPeriod=factor*packetSendingPeriod; + //packetSendingPeriod=0.995*packetSendingPeriod; + //System.out.println("dec snd factor "+factor+" to "+packetSendingPeriod); + statistics.setSendPeriod(packetSendingPeriod); } + private final long PS=UDPEndPoint.DATAGRAM_SIZE; + private final double BetaDivPS=0.0000015/PS; + //see spec page 16 - final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE; private double computeNumOfIncreasingPacket (){ - long B=estimatedLinkCapacity; - double C=1.0/packetSendingPeriod; - if(B<=C)return C; - long PS=UDPEndPoint.DATAGRAM_SIZE; + //link capacity and sending speed, in packets per second + double B=estimatedLinkCapacity; + double C=1000000.0/packetSendingPeriod; + + if(B<=C)return 1.0/UDPEndPoint.DATAGRAM_SIZE; + double exp=Math.ceil(Math.log10((B-C)*PS*8)); double power10 = Math.pow( 10.0, exp)* BetaDivPS; double inc = Math.max(power10, 1/PS); @@ -160,49 +190,49 @@ * @see udt.CongestionControl#onNAK(java.util.List) */ public void onNAK(List<Integer>lossInfo){ + loss=true; long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1); - long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber(); - lastAckSeqNumber = currentMaxSequenceNumber; + nACKCount++; /*1) If it is in slow start phase, set inter-packet interval to 1/recvrate. Slow start ends. Stop. */ if(slowStartPhase){ if(packetArrivalRate>0){ - packetSendingPeriod = 1e6/packetArrivalRate; + packetSendingPeriod = 100000.0/packetArrivalRate; } else{ - packetSendingPeriod=congestionWindowSize*(roundTripTime+Util.getSYNTime()); + packetSendingPeriod=congestionWindowSize/(roundTripTime+Util.getSYNTime()); } slowStartPhase = false; return; } - - //start new congestion epoch + long currentMaxSequenceNumber=session.getSocket().getSender().getCurrentSequenceNumber(); + + // 2)If this NAK starts a new congestion epoch if(firstBiggestlossSeqNo>lastDecreaseSeqNo){ - // 2)If this NAK starts a new congestion epoch + // -increase inter-packet interval packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125); // -Update AvgNAKNum(the average number of NAKs per congestion) averageNACKNum = (int)Math.ceil(averageNACKNum*0.875 + nACKCount*0.125); - // -reset NAKCount to 1, + // -reset NAKCount and DecCount to 1, nACKCount=1; - /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum.. */ + decCount=1; + /* - compute DecRandom to a random (average distribution) number between 1 and AvgNAKNum */ decreaseRandom =(int)Math.ceil((averageNACKNum-1)*Math.random()+1); // -Update LastDecSeq lastDecreaseSeqNo = currentMaxSequenceNumber; // -Stop. statistics.setSendPeriod(packetSendingPeriod); - return; } //* 3) If DecCount <= 5, and NAKCount == DecCount * DecRandom: - if(congestionEpochDecreaseCount<=5 && - nACKCount==congestionEpochDecreaseCount*decreaseRandom){ + if(decCount<=5 && nACKCount==decCount*decreaseRandom){ // a. Update SND period: SND = SND * 1.125; packetSendingPeriod = Math.ceil(packetSendingPeriod*1.125); // b. Increase DecCount by 1; - congestionEpochDecreaseCount++; + decCount++; // c. Record the current largest sent sequence number (LastDecSeq). lastDecreaseSeqNo= currentMaxSequenceNumber; statistics.setSendPeriod(packetSendingPeriod); Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-21 15:45:00 UTC (rev 21) @@ -110,7 +110,7 @@ long packetArrivalSpeed; //round trip time, calculated from ACK/ACK2 pairs - long roundTripTime=10*Util.getSYNTime(); + long roundTripTime=50*1000; //round trip time variance long roundTripTimeVar=roundTripTime/2; @@ -125,8 +125,13 @@ private long nextEXP; //microseconds to next EXP event - private long EXP_INTERVAL=10*Util.getSYNTime(); + private long EXP_INTERVAL=Util.getSYNTime(); + //instant when the session was created (for expiry checking) + private final long sessionUpSince; + //milliseconds to timeout a new session that stays idle + private final long IDLE_TIMEOUT = 3*60*1000; + //buffer size for storing data private final long bufferSize; @@ -150,6 +155,7 @@ public UDTReceiver(UDTSession session,UDPEndPoint endpoint){ this.endpoint = endpoint; this.session=session; + this.sessionUpSince=System.currentTimeMillis(); this.statistics=session.getStatistics(); if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); ackHistoryWindow = new AckHistoryWindow(16); @@ -295,7 +301,7 @@ UDTSender sender=session.getSocket().getSender(); //put all the unacknowledged packets in the senders loss list sender.putUnacknowledgedPacketsIntoLossList(); - if(expCount>16){ + if(expCount>16 && System.currentTimeMillis()-sessionUpSince > IDLE_TIMEOUT){ if(!connectionExpiryDisabled &&!stopped){ sendShutdown(); stop(); @@ -508,6 +514,10 @@ expCount=0; } + protected void resetEXPCount(){ + expCount=0; + } + protected void onShutdown()throws IOException{ stop(); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-21 15:45:00 UTC (rev 21) @@ -96,9 +96,6 @@ //last acknowledge number, initialised to the initial sequence number private long lastAckSequenceNumber; - //instant when the last packet was sent - private long lastSentTime=0; - //size of the send queue public final int sendQueueLength; @@ -199,6 +196,9 @@ NegativeAcknowledgement nak=(NegativeAcknowledgement)p; onNAKPacketReceived(nak); } + else if (p instanceof KeepAlive) { + session.getSocket().getReceiver().resetEXPCount(); + } } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ @@ -248,14 +248,16 @@ for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } - if(logger.isLoggable(Level.FINER)){ - logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo()); - } session.getCongestionControl().onNAK(nak.getDecodedLossInfo()); - //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET? session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); statistics.storeParameters(); + + //if(logger.isLoggable(Level.FINER)){ + System.out.println("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " + +"set send period to "+session.getCongestionControl().getSendInterval()); + //} + return; } @@ -279,16 +281,19 @@ * sender algorithm */ public void senderAlgorithm()throws InterruptedException, IOException{ + long iterationStart=Util.getCurrentTime(); //if the sender's loss list is not empty SenderLossListEntry entry=senderLossList.getFirstEntry(); if (entry!=null) { long seqNumber = entry.getSequenceNumber(); + + //TODO //if the current seqNumber is 16n,check the timeOut in the //loss list and send a message drop request. - if((seqNumber%16)==0){ - //TODO + //if((seqNumber%16)==0){ //sendLossList.checkTimeOut(timeToLive); - } + //} + try { //retransmit the packet with the first entry in the list //as sequence number and remove it from the list @@ -307,13 +312,13 @@ //if the number of unacknowledged data packets does not exceed the congestion //and the flow window sizes, pack a new packet int unAcknowledged=unacknowledged.get(); - double snd=session.getCongestionControl().getSendInterval(); + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() && unAcknowledged<session.getFlowWindowSize()){ - if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ - statistics.incNumberOfCCSlowDownEvents(); - return; - } +// if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ +// statistics.incNumberOfCCSlowDownEvents(); +// return; +// } if(sendQueue.size()==0){ Thread.yield(); return; @@ -321,7 +326,6 @@ DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); if(dp!=null){ send(dp); - lastSentTime=Util.getCurrentTime(); largestSentSequenceNumber=dp.getPacketSequenceNumber(); } }else{ @@ -332,6 +336,17 @@ Thread.sleep(1); //waitForAck(); } + + //wait + double snd=session.getCongestionControl().getSendInterval(); + long passed=Util.getCurrentTime()-iterationStart; + while(snd-passed>0){ + //busy wait, but we cannot wait with microsecond precision + if(snd-passed>750)Thread.sleep(1); + statistics.incNumberOfCCSlowDownEvents(); + passed=Util.getCurrentTime()-iterationStart; + } + } /** Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-21 15:45:00 UTC (rev 21) @@ -35,7 +35,8 @@ import udt.util.CircularArray; /** - * a circular array that records time intervals between two data packets + * a circular array that records time intervals between two probing data packets. + * It is used to determine the estimated link capacity. * @see {@link CircularArray} * */ @@ -63,6 +64,7 @@ median+=circularArray.get(i).doubleValue(); } median=median/num; + //median filtering double upper=median*8; double lower=median/8; @@ -76,9 +78,9 @@ count++; } } - median=total/count; - //System.out.println("median "+median); - return median; + double res=total/count; + //System.out.println("median: "+median+" filtered "+res); + return res; } /** Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-21 15:45:00 UTC (rev 21) @@ -18,6 +18,8 @@ }while(!serverStarted); File f=new File("src/test/java/datafile"); + f=new File("/tmp/200MB"); + File tmp=File.createTempFile("udtest-", null); String[] args=new String[]{"localhost","65321",f.getAbsolutePath(),tmp.getAbsolutePath()}; Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 21:21:15 UTC (rev 20) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-21 15:45:00 UTC (rev 21) @@ -20,7 +20,7 @@ boolean running=false; //how many - int num_packets=300; + int num_packets=200; //how large is a single packet int size=1*1024*1024; @@ -34,7 +34,7 @@ //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - UDTReceiver.connectionExpiryDisabled=true; + //UDTReceiver.connectionExpiryDisabled=true; doTest(); } @@ -109,7 +109,7 @@ c=is.read(buf); if(c<0)break; else{ - //md5.update(buf, 0, c); + md5.update(buf, 0, c); total+=c; Thread.yield(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |