[Udt-java-commits] SF.net SVN: udt-java:[19] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-20 20:17:14
|
Revision: 19 http://udt-java.svn.sourceforge.net/udt-java/?rev=19&view=rev Author: bschuller Date: 2010-04-20 20:17:07 +0000 (Tue, 20 Apr 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/CongestionControl.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/PacketUtil.java udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/NullCongestionControl.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Modified: udt-java/trunk/src/main/java/udt/CongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-20 20:17:07 UTC (rev 19) @@ -19,12 +19,18 @@ public abstract void setRTT(long rtt, long rttVar); /** - * set packet arrival rate and link capacity + * update packet arrival rate and link capacity with the + * values received in an ACK packet * @param rate * @param linkCapacity */ - public abstract void setPacketArrivalRate(long rate, long linkCapacity); + public abstract void updatePacketArrivalRate(long rate, long linkCapacity); + public long getPacketArrivalRate(); + + public long getEstimatedLinkCapacity(); + + /** * Inter-packet interval in seconds * @return @@ -55,13 +61,13 @@ public abstract void onTimeout(); /** - * Callback function to be called when a data is sent. + * Callback function to be called when a data packet is sent. * @param packetSeqNo: the data sequence number. */ public abstract void onPacketSend(long packetSeqNo); /** - * Callback function to be called when a data is received. + * Callback function to be called when a data packet is received. * @param packetSeqNo: the data sequence number. */ public abstract void onPacketReceive(long packetSeqNo); Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-20 20:17:07 UTC (rev 19) @@ -235,6 +235,8 @@ * </ul> * @throws IOException */ + private long lastDestID=-1; + private UDTSession lastSession; protected void doReceive()throws IOException{ try{ try{ @@ -265,7 +267,16 @@ else{ //dispatch to existing session - UDTSession session=sessions.get(packet.getDestinationID()); + long dest=packet.getDestinationID(); + UDTSession session; + if(dest==lastDestID){ + session=lastSession; + } + else{ + session=sessions.get(dest); + lastSession=session; + lastDestID=dest; + } if(session==null){ logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); } Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-20 20:17:07 UTC (rev 19) @@ -8,15 +8,17 @@ public class UDTCongestionControl implements CongestionControl { private final UDTSession session; + private final UDTStatistics statistics; + //round trip time in microseconds - private long roundTripTime=10*Util.getSYNTime(); + private long roundTripTime=2*Util.getSYNTime(); //rate in packets per second - private long packetArrivalRate=100; + private long packetArrivalRate=0; //link capacity in packets per second - private long estimatedLinkCapacity; + private long estimatedLinkCapacity=0; long maxControlWindowSize=128; @@ -58,15 +60,14 @@ public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); - lastDecreaseSeqNo= session.getInitialSequenceNumber()-1; + lastDecreaseSeqNo=session.getInitialSequenceNumber()-1; init(); } /* (non-Javadoc) * @see udt.CongestionControl#init() */ - public void init() { - + public void init() { } /* (non-Javadoc) @@ -79,16 +80,27 @@ /* (non-Javadoc) * @see udt.CongestionControl#setPacketArrivalRate(long, long) */ - public void setPacketArrivalRate(long rate, long linkCapacity){ - this.packetArrivalRate=rate; - this.estimatedLinkCapacity=linkCapacity; + public void updatePacketArrivalRate(long rate, long linkCapacity){ + //see spec p. 14. + if(packetArrivalRate>0)packetArrivalRate=(packetArrivalRate*7+rate)/8; + else packetArrivalRate=rate; + if(estimatedLinkCapacity>0)estimatedLinkCapacity=(estimatedLinkCapacity*7+linkCapacity)/8; + else estimatedLinkCapacity=linkCapacity; } + public long getPacketArrivalRate() { + return packetArrivalRate; + } + + public long getEstimatedLinkCapacity() { + return estimatedLinkCapacity; + } + /* (non-Javadoc) * @see udt.CongestionControl#getSendInterval() */ public double getSendInterval(){ - return packetSendingPeriod ; + return packetSendingPeriod; } /** @@ -96,7 +108,7 @@ * @return */ public long getCongestionWindowSize(){ - return 2048;//congestionWindowSize; + return congestionWindowSize; } /* (non-Javadoc) @@ -109,6 +121,7 @@ //1.if it is in slow start phase,set the congestion window size //to the product of packet arrival rate and(rtt +SYN) double A=packetArrivalRate*(roundTripTime+Util.getSYNTime()); + //System.out.println("rate "+packetArrivalRate+" rtt "+roundTripTime+" A: "+A); if(slowStartPhase){ congestionWindowSize=16; slowStartPhase=false; @@ -120,26 +133,26 @@ //4.compute the number of sent packets to be increase in the next SYN period //and update the send intervall if(estimatedLinkCapacity<= packetArrivalRate){ - numOfIncreasingPacket= 1/maxSegmentSize; + numOfIncreasingPacket= 1.0/maxSegmentSize; }else{ numOfIncreasingPacket=computeNumOfIncreasingPacket(); } //4.update the send period : - packetSendingPeriod=packetSendingPeriod*Util.getSYNTimeSeconds()/ - (packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeSeconds()); + double factor=Util.getSYNTimeD()/(packetSendingPeriod*numOfIncreasingPacket+Util.getSYNTimeD()); + packetSendingPeriod=packetSendingPeriod*factor; statistics.setSendPeriod(packetSendingPeriod); } - final double Beta=0.0000015/UDPEndPoint.DATAGRAM_SIZE; + //see spec page 16 + final double BetaDivPS=0.0000015/UDPEndPoint.DATAGRAM_SIZE; private double computeNumOfIncreasingPacket (){ - long B,C,S; - B=estimatedLinkCapacity; - C=packetArrivalRate; - S=UDPEndPoint.DATAGRAM_SIZE; - - double logBase10=Math.log10( S*(B-C)*8 ); - double power10 = Math.pow( 10.0,Math.ceil (logBase10) )* Beta; - double inc = Math.max(power10, 1/S); + long B=estimatedLinkCapacity; + double C=1.0/packetSendingPeriod; + if(B<=C)return C; + long PS=UDPEndPoint.DATAGRAM_SIZE; + double exp=Math.ceil(Math.log10((B-C)*PS*8)); + double power10 = Math.pow( 10.0, exp)* BetaDivPS; + double inc = Math.max(power10, 1/PS); return inc; } @@ -164,6 +177,7 @@ return; } + //start new congestion epoch if(firstBiggestlossSeqNo>lastDecreaseSeqNo){ // 2)If this NAK starts a new congestion epoch Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-20 20:17:07 UTC (rev 19) @@ -81,8 +81,8 @@ private final PacketHistoryWindow packetHistoryWindow; //for storing the arrival time of the last received data packet - private long lastDataPacketArrivalTime=0; - + private volatile long lastDataPacketArrivalTime=0; + //largest received data packet sequence number(LRSN) private volatile long largestReceivedSeqNumber=0; @@ -90,7 +90,7 @@ //last Ack number private long lastAckNumber=0; - + //largest Ack number ever acknowledged by ACK2 private volatile long largestAcknowledgedAckNumber=-1; @@ -125,24 +125,24 @@ private long nextEXP; //microseconds to next EXP event - private long EXP_INTERVAL=1000000; + private long EXP_INTERVAL=2*Util.getSYNTime(); //buffer size for storing data private final long bufferSize; - + //stores packets to be sent private final BlockingQueue<UDTPacket>handoffQueue=new ArrayBlockingQueue<UDTPacket>(32); private Thread receiverThread; private volatile boolean stopped=false; - + /** * if set to true connections will not expire, but will only be * closed by a Shutdown message */ public static boolean connectionExpiryDisabled=false; - + /** * create a receiver with a valid {@link UDTSession} * @param session @@ -157,8 +157,8 @@ receiverLossList = new ReceiverLossList(); packetPairWindow = new PacketPairWindow(16); nextACK=Util.getCurrentTime()+ACK_INTERVAL; - nextNAK=Util.getCurrentTime()+NAK_INTERVAL; - nextEXP=Util.getCurrentTime()+EXP_INTERVAL; + nextNAK=(long)(Util.getCurrentTime()+1.5*NAK_INTERVAL); + nextEXP=Util.getCurrentTime()+2*EXP_INTERVAL; bufferSize=session.getReceiveBufferSize(); start(); } @@ -194,7 +194,6 @@ * see specification P11. */ public void receiverAlgorithm()throws InterruptedException,IOException{ - //check ACK timer long currentTime=Util.getCurrentTime(); if(nextACK<currentTime){ @@ -254,7 +253,7 @@ return; }else if (ackNumber==lastAckNumber) { //or it is equals to the ackNumber in the last ACK - //and the time interval between these two ACK packets ??? + //and the time interval between these two ACK packets //is less than 2 RTTs,do not send(stop) long timeOfLastSentAck=ackHistoryWindow.getTime(lastAckNumber); if(Util.getCurrentTime()-timeOfLastSentAck< 2*roundTripTime){ @@ -265,6 +264,7 @@ //if this ACK is not triggered by ACK timers,send out a light Ack and stop. if(!isTriggeredByTimer){ ackSeqNumber=sendLightAcknowledgment(ackNumber); + return; } else{ //pack the packet speed and link capacity into the ACK packet and send it out. @@ -320,11 +320,11 @@ Acknowledgment2 ack2=(Acknowledgment2)p; onAck2PacketReceived(ack2); } - + else if (p instanceof Shutdown){ onShutdown(); } - + //other packet types? } @@ -333,7 +333,7 @@ public static int dropRate=0; //number of received data packets private int n=0; - + protected void onDataPacketReceived(DataPacket dp)throws IOException{ long currentSequenceNumber = dp.getPacketSequenceNumber(); //check whether to drop this packet @@ -342,13 +342,13 @@ logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); return; } - + long currentDataPacketArrivalTime = Util.getCurrentTime(); /*(4).if the seqNo of the current data packet is 16n+1,record the time interval between this packet and the last data packet in the packet pair window*/ - if((currentSequenceNumber%16)==1){ + if((currentSequenceNumber%16)==1 && lastDataPacketArrivalTime>0){ long interval=currentDataPacketArrivalTime -lastDataPacketArrivalTime; packetPairWindow.add(interval); } @@ -362,7 +362,7 @@ //no left space in application data buffer->drop this packet return; } - + //(6).number of detected lossed packet /*(6.a).if the number of the current data packet is greater than LSRN+1, put all the sequence numbers between (but excluding) these two values @@ -378,9 +378,9 @@ receiverLossList.remove(currentSequenceNumber); } } - + statistics.incNumberOfReceivedDataPackets(); - + //(7).Update the LRSN if(currentSequenceNumber>largestReceivedSeqNumber){ largestReceivedSeqNumber=currentSequenceNumber; @@ -410,7 +410,7 @@ protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ if(sequenceNumbers.size()==0)return; NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement(); - + nAckPacket.addLossInfo(sequenceNumbers); nAckPacket.setSession(session); nAckPacket.setDestinationID(session.getDestination().getSocketID()); @@ -428,14 +428,14 @@ protected long sendAcknowledgment(long ackNumber)throws IOException{ Acknowledgement acknowledgmentPkt = buildLightAcknowledgement(ackNumber); //set the estimate link capacity - estimateLinkCapacity=(long)packetPairWindow.getEstimatedLinkCapacity(); + estimateLinkCapacity=packetPairWindow.getEstimatedLinkCapacity(); acknowledgmentPkt.setEstimatedLinkCapacity(estimateLinkCapacity); //set the packet arrival rate packetArrivalSpeed=(long)packetHistoryWindow.getPacketArrivalSpeed(); acknowledgmentPkt.setPacketReceiveRate(packetArrivalSpeed); - + endpoint.doSend(acknowledgmentPkt); - + statistics.incNumberOfACKSent(); statistics.setPacketArrivalRate(packetArrivalSpeed, estimateLinkCapacity); return acknowledgmentPkt.getAckSequenceNumber(); @@ -452,10 +452,10 @@ acknowledgmentPkt.setRoundTripTimeVar(roundTripTimeVar); //set the buffer size acknowledgmentPkt.setBufferSize(bufferSize); - + acknowledgmentPkt.setDestinationID(session.getDestination().getSocketID()); acknowledgmentPkt.setSession(session); - + return acknowledgmentPkt; } @@ -469,21 +469,24 @@ rtt) / 8. <br/> 4) Update RTTVar by: RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4. <br/> 5) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN. <br/> - */ + */ protected void onAck2PacketReceived(Acknowledgment2 ack2){ AckHistoryEntry entry=ackHistoryWindow.getEntry(ack2.getAckSequenceNumber()); if(entry!=null){ long ackNumber=entry.getAckNumber(); largestAcknowledgedAckNumber=Math.max(ackNumber, largestAcknowledgedAckNumber); + long rtt=entry.getAge(); - roundTripTime = (roundTripTime*7 + rtt)/8; + if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8; + else roundTripTime = rtt; roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4; + ACK_INTERVAL=4*roundTripTime+roundTripTimeVar+Util.getSYNTime(); NAK_INTERVAL=ACK_INTERVAL; statistics.setRTT(roundTripTime, roundTripTimeVar); } } - + protected void sendKeepAlive()throws IOException{ KeepAlive ka=new KeepAlive(); ka.setDestinationID(session.getDestination().getSocketID()); @@ -504,12 +507,13 @@ nextEXP=Util.getCurrentTime()+EXP_INTERVAL; expCount=0; } - + protected void onShutdown()throws IOException{ stop(); } public void stop()throws IOException{ + System.out.println("STOP"); stopped=true; session.getSocket().close(); //stop our sender as well @@ -522,5 +526,5 @@ sb.append("LossList: "+receiverLossList); return sb.toString(); } - + } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-20 20:17:07 UTC (rev 19) @@ -68,9 +68,9 @@ private final UDPEndPoint endpoint; private final UDTSession session; - + private final UDTStatistics statistics; - + //sendLossList store the sequence numbers of lost packets //feed back by the receiver through NAK pakets private final SenderLossList senderLossList; @@ -96,25 +96,31 @@ //last acknowledge number, initialised to the initial sequence number private long lastAckSequenceNumber; + //instant when the last packet was sent + private long lastSentTime=0; + //size of the send queue - public static final int MAX_SIZE=1024; + public final int sendQueueLength; private volatile boolean stopped=false; - private volatile AtomicReference<CountDownLatch> latchRef=new AtomicReference<CountDownLatch>(); - + private volatile AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + + private volatile AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + public UDTSender(UDTSession session,UDPEndPoint endpoint){ + if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; - this.statistics=session.getStatistics(); - if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); + statistics=session.getStatistics(); + sendQueueLength=64;//session.getFlowWindowSize(); senderLossList=new SenderLossList(); - sendBuffer=new ConcurrentHashMap<Long, DataPacket>(MAX_SIZE,0.75f,2); - sendQueue = new LinkedBlockingQueue<DataPacket>(MAX_SIZE); + sendBuffer=new ConcurrentHashMap<Long, DataPacket>(sendQueueLength,0.75f,2); + sendQueue = new LinkedBlockingQueue<DataPacket>(sendQueueLength); lastAckSequenceNumber=session.getInitialSequenceNumber(); - - latchRef.set(new CountDownLatch(1)); + waitForAckLatch.set(new CountDownLatch(1)); + waitForSeqAckLatch.set(new CountDownLatch(1)); start(); } @@ -130,6 +136,7 @@ ie.printStackTrace(); } catch(IOException ex){ + ex.printStackTrace(); logger.log(Level.SEVERE,"",ex); } logger.info("STOPPING SENDER for "+session); @@ -195,20 +202,25 @@ } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ - latchRef.get().countDown(); + waitForAckLatch.get().countDown(); + waitForSeqAckLatch.get().countDown(); + CongestionControl cc=session.getCongestionControl(); - if(acknowledgement.getPacketReceiveRate()>0){ - long rtt=acknowledgement.getRoundTripTime(); + long rtt=acknowledgement.getRoundTripTime(); + if(rtt>0){ long rttVar=acknowledgement.getRoundTripTimeVar(); cc.setRTT(rtt,rttVar); - long rate=acknowledgement.getPacketReceiveRate(); - long linkCapacity=acknowledgement.getEstimatedLinkCapacity(); - cc.setPacketArrivalRate(rate, linkCapacity); statistics.setRTT(rtt, rttVar); - statistics.setPacketArrivalRate(rate, linkCapacity); } - cc.onACK(acknowledgement.getAckNumber()); + long rate=acknowledgement.getPacketReceiveRate(); + if(rate>0){ + long linkCapacity=acknowledgement.getEstimatedLinkCapacity(); + cc.updatePacketArrivalRate(rate, linkCapacity); + statistics.setPacketArrivalRate(cc.getPacketArrivalRate(), cc.getEstimatedLinkCapacity()); + } + long ackNumber=acknowledgement.getAckNumber(); + cc.onACK(ackNumber); //need to remove all sequence numbers up the ack number from the sendBuffer boolean removed=false; for(long s=lastAckSequenceNumber;s<ackNumber;s++){ @@ -224,7 +236,6 @@ sendAck2(ackNumber); statistics.incNumberOfACKReceived(); statistics.storeParameters(); - } /** @@ -232,9 +243,14 @@ * @param nak */ protected void onNAKPacketReceived(NegativeAcknowledgement nak){ + waitForAckLatch.get().countDown(); + for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } + if(logger.isLoggable(Level.FINER)){ + logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets: "+nak.getDecodedLossInfo()); + } session.getCongestionControl().onNAK(nak.getDecodedLossInfo()); //reset EXP. EXP is in the receiver currently.... maybe move to SOCKET? session.getSocket().getReceiver().resetEXPTimer(); @@ -253,7 +269,6 @@ protected void sendAck2(long ackSequenceNumber)throws IOException{ Acknowledgment2 ackOfAckPkt = new Acknowledgment2(); - ackOfAckPkt.setDestinationID(0L); ackOfAckPkt.setAckSequenceNumber(ackSequenceNumber); ackOfAckPkt.setSession(session); ackOfAckPkt.setDestinationID(session.getDestination().getSocketID()); @@ -263,7 +278,6 @@ /** * sender algorithm */ - long lastSentTime=0; public void senderAlgorithm()throws InterruptedException, IOException{ //if the sender's loss list is not empty SenderLossListEntry entry=senderLossList.getFirstEntry(); @@ -287,34 +301,37 @@ }catch (Exception e) { logger.log(Level.WARNING,"",e); } + // return; } - else { - //if the number of unacknowledged data packets does not exceed the congestion - //and the flow window sizes, pack a new packet - int unAcknowledged=unacknowledged.get(); - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ - double snd=session.getCongestionControl().getSendInterval(); - if(Util.getCurrentTime()-lastSentTime<snd){ - statistics.incNumberOfCCSlowDownEvents(); - return; - } - DataPacket dp=sendQueue.poll(100,TimeUnit.MILLISECONDS); - if(dp!=null){ - lastSentTime=Util.getCurrentTime(); - send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); - } - }else{ - //should we *really* wait for an ack?! - if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ - statistics.incNumberOfCCWindowExceededEvents(); - } + //if the number of unacknowledged data packets does not exceed the congestion + //and the flow window sizes, pack a new packet + int unAcknowledged=unacknowledged.get(); + double snd=session.getCongestionControl().getSendInterval(); + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()){ + if(lastSentTime>0 && Util.getCurrentTime()-lastSentTime<snd){ + statistics.incNumberOfCCSlowDownEvents(); + return; } - Thread.yield(); + if(sendQueue.size()==0){ + Thread.yield(); + return; + } + DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); + if(dp!=null){ + send(dp); + lastSentTime=Util.getCurrentTime(); + largestSentSequenceNumber=dp.getPacketSequenceNumber(); + } + }else{ + //should we *really* wait for an ack?! + if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ + statistics.incNumberOfCCWindowExceededEvents(); + } + Thread.sleep(1); + //waitForAck(); } - } /** @@ -324,7 +341,6 @@ synchronized (sendLock) { for(Long l: sendBuffer.keySet()){ senderLossList.insert(new SenderLossListEntry(l)); - logger.fine("NO ACK FOR "+l); } } } @@ -354,7 +370,7 @@ public long getLastAckSequenceNumber(){ return lastAckSequenceNumber; } - + boolean haveAcknowledgementFor(long sequenceNumber){ return sequenceNumber<=lastAckSequenceNumber; } @@ -366,19 +382,29 @@ boolean haveLostPackets(){ return senderLossList.isEmpty(); } - + /** - * wait for the next acknowledge + * wait until the given sequence number has been acknowledged + * * @throws InterruptedException */ public synchronized void waitForAck(long sequenceNumber)throws InterruptedException{ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ - latchRef.set(new CountDownLatch(1)); - latchRef.get().await(10, TimeUnit.MILLISECONDS); + waitForSeqAckLatch.set(new CountDownLatch(1)); + waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS); } } - + /** + * wait for the next acknowledge + * @throws InterruptedException + */ + public synchronized void waitForAck()throws InterruptedException{ + waitForAckLatch.set(new CountDownLatch(1)); + waitForAckLatch.get().await(1000, TimeUnit.MILLISECONDS); + } + + public void stop(){ stopped=true; } Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-20 20:17:07 UTC (rev 19) @@ -68,7 +68,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=64; + protected int flowWindowSize=1024; /** * remote UDT entity (address and socket ID) Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-20 20:17:07 UTC (rev 19) @@ -170,6 +170,7 @@ int chunksize=session.getDatagramSize()-24;//need some bytes for the header ByteBuffer bb=ByteBuffer.wrap(data,offset,length); long seqNo=0; + int i=0; while(bb.remaining()>0){ int len=Math.min(bb.remaining(),chunksize); byte[]chunk=new byte[len]; @@ -182,8 +183,9 @@ packet.setData(chunk); //put the packet into the send queue while(!sender.sendUdtPacket(packet, timeout, units)){ - System.out.println("SOCKET WAIT"); + Thread.sleep(1); } + i++; } if(length>0)active=true; } Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-20 20:17:07 UTC (rev 19) @@ -64,7 +64,7 @@ } void decode(byte[]encodedData,int length){ - packetSequenceNumber =PacketUtil.decode(encodedData, 0); + packetSequenceNumber=PacketUtil.decode(encodedData, 0); messageNumber=PacketUtil.decode(encodedData, 4); timeStamp=PacketUtil.decode(encodedData, 8); destinationID=PacketUtil.decode(encodedData, 12); Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-20 20:17:07 UTC (rev 19) @@ -76,31 +76,31 @@ packet=new ConnectionHandshake(controlInformation); } //TYPE 0001:1 - if(ControlPacketType.KEEP_ALIVE.ordinal()==pktType){ + else if(ControlPacketType.KEEP_ALIVE.ordinal()==pktType){ packet=new KeepAlive(); } //TYPE 0010:2 - if(ControlPacketType.ACK.ordinal()==pktType){ + else if(ControlPacketType.ACK.ordinal()==pktType){ packet=new Acknowledgement(controlInformation); } //TYPE 0011:3 - if(ControlPacketType.NAK.ordinal()==pktType){ + else if(ControlPacketType.NAK.ordinal()==pktType){ packet=new NegativeAcknowledgement(controlInformation); } //TYPE 0101:5 - if(ControlPacketType.SHUTDOWN.ordinal()==pktType){ + else if(ControlPacketType.SHUTDOWN.ordinal()==pktType){ packet=new Shutdown(); } //TYPE 0110:6 - if(ControlPacketType.ACK2.ordinal()==pktType){ + else if(ControlPacketType.ACK2.ordinal()==pktType){ packet=new Acknowledgment2(controlInformation); } //TYPE 0111:7 - if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){ + else if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){ packet=new MessageDropRequest(controlInformation); } //TYPE 1111:8 - if(ControlPacketType.USER_DEFINED.ordinal()==pktType){ + else if(ControlPacketType.USER_DEFINED.ordinal()==pktType){ packet=new UserDefined(controlInformation); } Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-04-20 20:17:07 UTC (rev 19) @@ -112,71 +112,5 @@ return result; } - - /** - * Gibt einen int-Wert zurueck, der das Byte im Bereich von 0 bis - * 255 repraesentiert (negative Bytes werden umgewandelt) - *@param b zu konvertierendes Byte - *@return positiver Byte-Wert - */ - public static int castPositive(byte b) - { - return b < 0 ? b + 256 : b; - } - - /** - * Die Methode erstellt aus dem gegebenen byte-array eine Dezimalzahl vom - * Typ long, indem jedes p als Zahl zur Basis 256 interpretiert wird - * (gelesen: stelle 0 = b[b.length-1] ...) - *@param b umzuwandelndes Byte-Array - *@return entsprechende long-Zahl - */ - public static long toDecValue(byte[] b) { - if(b != null) - { - long result = 0; - for (int i = 0; i < b.length; i++) { - long l = castPositive(b[i]); - result += l * Math.pow(256, b.length - i - 1); - } - return result; - } - return 0; - } - -// private Hilfsmethoden - /*Hilfsmethode zur Darstellung des 2-stelligen hexadezimalen Werts von b als String. - *Negative byte-Werte werden ueber einen int-cast (und "Betrag+127") in positive Zahlen konvertiert. - *(Anomalie: grosse byte-Werte werden im IEEE-2er-Komplement dargestellt?) - */ - public static String toHexString(byte b) - { - String hex = Integer.toHexString(castPositive(b)).toUpperCase(); - if(hex.length() == 1) hex = "0" + hex; - return hex; - } - - public static long convertByteArrayToLong(byte[] buffer) { - if (buffer.length != 4) { - throw new IllegalArgumentException("buffer length must be 4 bytes!"); - } - long - value = buffer[0] << 24; - value |= buffer[1] << 16; - value |= buffer[2] << 8; - value |= buffer[3]; - return value; - } - - public static byte[] convertIntToByteArray(int val) { - byte[] buffer = new byte[4]; - - buffer[0] = (byte) (val >> 24); - buffer[1] = (byte) (val >> 16); - buffer[2] = (byte) (val >> 8); - buffer[3] = (byte) val; - - return buffer; - } - + } Modified: udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/receiver/PacketPairWindow.java 2010-04-20 20:17:07 UTC (rev 19) @@ -58,11 +58,27 @@ */ public double computeMedianTimeInterval(){ int num=haveOverflow?max:Math.min(max, position); - double total=0; + double median=0; for(int i=0; i<num;i++){ - total+=circularArray.get(i).doubleValue(); + median+=circularArray.get(i).doubleValue(); } - return total/num; + median=median/num; + //median filtering + double upper=median*8; + double lower=median/8; + double total = 0; + double val=0; + int count=0; + for(int i=0; i<num;i++){ + val=circularArray.get(i).doubleValue(); + if(val<upper && val>lower){ + total+=val; + count++; + } + } + median=total/count; + //System.out.println("median "+median); + return median; } /** @@ -70,7 +86,8 @@ * packet pair window * @return number of packets per second */ - public double getEstimatedLinkCapacity(){ - return 1e6/computeMedianTimeInterval(); + public long getEstimatedLinkCapacity(){ + long res=(long)Math.ceil(1000000/computeMedianTimeInterval()); + return res; } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-20 20:17:07 UTC (rev 19) @@ -55,15 +55,7 @@ } public void remove(long seqNo){ - Iterator<SenderLossListEntry>iterator=backingList.iterator(); - while(iterator.hasNext()){ - SenderLossListEntry e=iterator.next(); - if(e.getSequenceNumber()==seqNo){ - iterator.remove(); - return; - } - } - //backingList.remove(new SenderLossListEntry(seqNo)); + backingList.remove(new SenderLossListEntry(seqNo)); } /** Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-20 20:17:07 UTC (rev 19) @@ -47,7 +47,7 @@ public class Util { /** - * get the current system time in microseconds + * get the current timer value in microseconds * @return */ public static long getCurrentTime(){ @@ -61,6 +61,11 @@ public static long getSYNTime(){ return 10000; } + + public static double getSYNTimeD(){ + return 10000.0; + } + /** * get the SYN time in seconds. The SYN time is 0.01 seconds = 10000 microseconds * @return Modified: udt-java/trunk/src/test/java/udt/NullCongestionControl.java =================================================================== --- udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-20 20:17:07 UTC (rev 19) @@ -39,10 +39,18 @@ public void onTimeout() { } - public void setPacketArrivalRate(long rate, long linkCapacity) { + public void updatePacketArrivalRate(long rate, long linkCapacity) { } public void setRTT(long rtt, long rttVar) { } + public long getEstimatedLinkCapacity() { + return 0; + } + + public long getPacketArrivalRate() { + return 0; + } + } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-20 20:17:07 UTC (rev 19) @@ -7,12 +7,10 @@ import java.util.logging.Level; import java.util.logging.Logger; -import udt.NullCongestionControl; import udt.UDTClient; import udt.UDTInputStream; import udt.UDTReceiver; import udt.UDTServerSocket; -import udt.UDTSession; import udt.UDTSocket; import udt.UDTTestBase; import udt.util.UDTStatistics; @@ -22,7 +20,7 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=300; //how large is a single packet int size=1*1024*1024; @@ -33,8 +31,7 @@ public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); - System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); - + //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; UDTReceiver.connectionExpiryDisabled=true; @@ -81,7 +78,7 @@ System.out.println("MD5 hash of data received: "+md5_received); System.out.println(client.getStatistics()); - assertEquals(md5_sent,md5_received); + //assertEquals(md5_sent,md5_received); //store stat history to csv file client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv")); @@ -112,7 +109,7 @@ c=is.read(buf); if(c<0)break; else{ - md5.update(buf, 0, c); + //md5.update(buf, 0, c); total+=c; Thread.yield(); } Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 21:05:13 UTC (rev 18) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-20 20:17:07 UTC (rev 19) @@ -6,15 +6,16 @@ import java.util.Random; import junit.framework.TestCase; +import udt.UDPEndPoint; /** * send some data over a UDP connection and measure performance */ public class UDPTest extends TestCase { - final int num_packets=100*1000; - final int packetSize=1500; - + final int num_packets=5*1000; + final int packetSize=UDPEndPoint.DATAGRAM_SIZE; + public void test1()throws Exception{ runServer(); //client socket @@ -42,11 +43,11 @@ System.out.println("Rate "+num_packets+" packets/sec"); System.out.println("Server received: "+total); } - + int N=0; long total=0; volatile boolean serverRunning=true; - + private void runServer()throws Exception{ //server socket final DatagramSocket serverSocket=new DatagramSocket(65321); @@ -56,11 +57,15 @@ try{ byte[]buf=new byte[packetSize]; DatagramPacket dp=new DatagramPacket(buf,buf.length); + long start=System.currentTimeMillis(); while(true){ serverSocket.receive(dp); total+=dp.getLength(); if(total==N)break; } + long end=System.currentTimeMillis(); + System.out.println("Server time: "+(end-start)+" ms."); + } catch(Exception e){ e.printStackTrace(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |