[Udt-java-commits] SF.net SVN: udt-java:[64] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2011-08-16 17:56:14
|
Revision: 64 http://udt-java.svn.sourceforge.net/udt-java/?rev=64&view=rev Author: bschuller Date: 2011-08-16 17:56:07 +0000 (Tue, 16 Aug 2011) Log Message: ----------- fix two bugs: thanks to ajsenf (Alexander Senf) see https://sourceforge.net/projects/udt-java/forums/forum/1109269/topic/4615162?message=10597365 Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-16 17:56:07 UTC (rev 64) @@ -36,6 +36,7 @@ import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -115,15 +116,15 @@ } /** - * flush outstanding data (and make sure it is acknowledged) + * flush outstanding data, with the specified maximum waiting time + * @param timeOut - timeout in millis (if smaller than 0, no timeout is used) * @throws IOException * @throws InterruptedException */ - public void flush()throws IOException, InterruptedException{ + public void flush()throws IOException, InterruptedException, TimeoutException{ clientSession.getSocket().flush(); } - public void shutdown()throws IOException{ if (clientSession.isReady()&& clientSession.active==true) Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-16 17:56:07 UTC (rev 64) @@ -39,7 +39,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -110,11 +111,9 @@ private volatile CountDownLatch startLatch=new CountDownLatch(1); //used by the sender to wait for an ACK - private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final ReentrantLock ackLock=new ReentrantLock(); + private final Condition ackCondition=ackLock.newCondition(); - //used by the sender to wait for an ACK of a certain sequence number - private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); - private final boolean storeStatistics; private final int chunksize; @@ -130,8 +129,6 @@ flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize); lastAckSequenceNumber=session.getInitialSequenceNumber(); currentSequenceNumber=session.getInitialSequenceNumber()-1; - waitForAckLatch.set(new CountDownLatch(1)); - waitForSeqAckLatch.set(new CountDownLatch(1)); storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics"); initMetrics(); doStart(); @@ -278,8 +275,9 @@ } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ - waitForAckLatch.get().countDown(); - waitForSeqAckLatch.get().countDown(); + ackLock.lock(); + ackCondition.signal(); + ackLock.unlock(); CongestionControl cc=session.getCongestionControl(); long rtt=acknowledgement.getRoundTripTime(); @@ -407,6 +405,8 @@ } } + private final DataPacket retransmit=new DataPacket(); + /** * re-transmit an entry from the sender loss list * @param entry @@ -416,13 +416,11 @@ //retransmit the packet and remove it from the list byte[]data=sendBuffer.get(seqNumber); if(data!=null){ - //System.out.println("re-transmit "+data); - DataPacket packet=new DataPacket(); - packet.setPacketSequenceNumber(seqNumber); - packet.setSession(session); - packet.setDestinationID(session.getDestination().getSocketID()); - packet.setData(data); - endpoint.doSend(packet); + retransmit.setPacketSequenceNumber(seqNumber); + retransmit.setSession(session); + retransmit.setDestinationID(session.getDestination().getSocketID()); + retransmit.setData(data); + endpoint.doSend(retransmit); statistics.incNumberOfRetransmittedDataPackets(); } }catch (Exception e) { @@ -486,18 +484,37 @@ */ public void waitForAck(long sequenceNumber)throws InterruptedException{ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ - waitForSeqAckLatch.set(new CountDownLatch(1)); - waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS); + ackLock.lock(); + try{ + ackCondition.await(100, TimeUnit.MICROSECONDS); + }finally{ + ackLock.unlock(); + } } } + public void waitForAck(long sequenceNumber, int timeout)throws InterruptedException{ + while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ + ackLock.lock(); + try{ + ackCondition.await(timeout, TimeUnit.MILLISECONDS); + }finally{ + ackLock.unlock(); + } + } + } + /** * wait for the next acknowledge * @throws InterruptedException */ public void waitForAck()throws InterruptedException{ - waitForAckLatch.set(new CountDownLatch(1)); - waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS); + ackLock.lock(); + try{ + ackCondition.await(200, TimeUnit.MICROSECONDS); + }finally{ + ackLock.unlock(); + } } Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-16 17:56:07 UTC (rev 64) @@ -61,16 +61,16 @@ } void decode(byte[]data){ + ackSequenceNumber=PacketUtil.decode(data, 0); } public boolean forSender(){ return false; } - private static final byte[]empty=new byte[0]; @Override public byte[] encodeControlInformation(){ - return empty; + return PacketUtil.encode(ackSequenceNumber); } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-16 17:56:07 UTC (rev 64) @@ -54,7 +54,7 @@ try{ long seq=data.getSequenceNumber(); //if already have this chunk, discard it - if(SequenceNumber.compare(seq, initialSequenceNumber)<0)return true; + if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true; //else compute insert position int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq); int insert=offset% size; @@ -120,6 +120,7 @@ } else return null; } + numValidChunks.decrementAndGet(); return r; } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-16 17:56:07 UTC (rev 64) @@ -5,6 +5,7 @@ import java.security.MessageDigest; import java.text.NumberFormat; import java.util.Random; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,10 +22,10 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=500; //how large is a single packet - int size=1*1024*1024; + int size=20*1024*1024; int TIMEOUT=Integer.MAX_VALUE; @@ -36,7 +37,12 @@ // System.setProperty("udt.sender.storeStatistics","true"); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - doTest(); + try{ + doTest(); + }catch(TimeoutException te){ + te.printStackTrace(); + fail(); + } } private final NumberFormat format=NumberFormat.getNumberInstance(); @@ -59,6 +65,7 @@ long start=System.currentTimeMillis(); System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each"); long end=0; + if(serverRunning){ for(int i=0;i<num_packets;i++){ long block=System.currentTimeMillis(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |