[Udt-java-commits] SF.net SVN: udt-java:[64] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2011-08-16 17:56:14
|
Revision: 64
http://udt-java.svn.sourceforge.net/udt-java/?rev=64&view=rev
Author: bschuller
Date: 2011-08-16 17:56:07 +0000 (Tue, 16 Aug 2011)
Log Message:
-----------
fix two bugs: thanks to ajsenf (Alexander Senf)
see https://sourceforge.net/projects/udt-java/forums/forum/1109269/topic/4615162?message=10597365
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -36,6 +36,7 @@
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -115,15 +116,15 @@
}
/**
- * flush outstanding data (and make sure it is acknowledged)
+ * flush outstanding data, with the specified maximum waiting time
+ * @param timeOut - timeout in millis (if smaller than 0, no timeout is used)
* @throws IOException
* @throws InterruptedException
*/
- public void flush()throws IOException, InterruptedException{
+ public void flush()throws IOException, InterruptedException, TimeoutException{
clientSession.getSocket().flush();
}
-
public void shutdown()throws IOException{
if (clientSession.isReady()&& clientSession.active==true)
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -39,7 +39,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -110,11 +111,9 @@
private volatile CountDownLatch startLatch=new CountDownLatch(1);
//used by the sender to wait for an ACK
- private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+ private final ReentrantLock ackLock=new ReentrantLock();
+ private final Condition ackCondition=ackLock.newCondition();
- //used by the sender to wait for an ACK of a certain sequence number
- private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
-
private final boolean storeStatistics;
private final int chunksize;
@@ -130,8 +129,6 @@
flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize);
lastAckSequenceNumber=session.getInitialSequenceNumber();
currentSequenceNumber=session.getInitialSequenceNumber()-1;
- waitForAckLatch.set(new CountDownLatch(1));
- waitForSeqAckLatch.set(new CountDownLatch(1));
storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics");
initMetrics();
doStart();
@@ -278,8 +275,9 @@
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
- waitForAckLatch.get().countDown();
- waitForSeqAckLatch.get().countDown();
+ ackLock.lock();
+ ackCondition.signal();
+ ackLock.unlock();
CongestionControl cc=session.getCongestionControl();
long rtt=acknowledgement.getRoundTripTime();
@@ -407,6 +405,8 @@
}
}
+ private final DataPacket retransmit=new DataPacket();
+
/**
* re-transmit an entry from the sender loss list
* @param entry
@@ -416,13 +416,11 @@
//retransmit the packet and remove it from the list
byte[]data=sendBuffer.get(seqNumber);
if(data!=null){
- //System.out.println("re-transmit "+data);
- DataPacket packet=new DataPacket();
- packet.setPacketSequenceNumber(seqNumber);
- packet.setSession(session);
- packet.setDestinationID(session.getDestination().getSocketID());
- packet.setData(data);
- endpoint.doSend(packet);
+ retransmit.setPacketSequenceNumber(seqNumber);
+ retransmit.setSession(session);
+ retransmit.setDestinationID(session.getDestination().getSocketID());
+ retransmit.setData(data);
+ endpoint.doSend(retransmit);
statistics.incNumberOfRetransmittedDataPackets();
}
}catch (Exception e) {
@@ -486,18 +484,37 @@
*/
public void waitForAck(long sequenceNumber)throws InterruptedException{
while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
- waitForSeqAckLatch.set(new CountDownLatch(1));
- waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS);
+ ackLock.lock();
+ try{
+ ackCondition.await(100, TimeUnit.MICROSECONDS);
+ }finally{
+ ackLock.unlock();
+ }
}
}
+ public void waitForAck(long sequenceNumber, int timeout)throws InterruptedException{
+ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
+ ackLock.lock();
+ try{
+ ackCondition.await(timeout, TimeUnit.MILLISECONDS);
+ }finally{
+ ackLock.unlock();
+ }
+ }
+ }
+
/**
* wait for the next acknowledge
* @throws InterruptedException
*/
public void waitForAck()throws InterruptedException{
- waitForAckLatch.set(new CountDownLatch(1));
- waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS);
+ ackLock.lock();
+ try{
+ ackCondition.await(200, TimeUnit.MICROSECONDS);
+ }finally{
+ ackLock.unlock();
+ }
}
Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -61,16 +61,16 @@
}
void decode(byte[]data){
+ ackSequenceNumber=PacketUtil.decode(data, 0);
}
public boolean forSender(){
return false;
}
- private static final byte[]empty=new byte[0];
@Override
public byte[] encodeControlInformation(){
- return empty;
+ return PacketUtil.encode(ackSequenceNumber);
}
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -54,7 +54,7 @@
try{
long seq=data.getSequenceNumber();
//if already have this chunk, discard it
- if(SequenceNumber.compare(seq, initialSequenceNumber)<0)return true;
+ if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true;
//else compute insert position
int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq);
int insert=offset% size;
@@ -120,6 +120,7 @@
}
else return null;
}
+ numValidChunks.decrementAndGet();
return r;
}
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -5,6 +5,7 @@
import java.security.MessageDigest;
import java.text.NumberFormat;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -21,10 +22,10 @@
boolean running=false;
//how many
- int num_packets=100;
+ int num_packets=500;
//how large is a single packet
- int size=1*1024*1024;
+ int size=20*1024*1024;
int TIMEOUT=Integer.MAX_VALUE;
@@ -36,7 +37,12 @@
// System.setProperty("udt.sender.storeStatistics","true");
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- doTest();
+ try{
+ doTest();
+ }catch(TimeoutException te){
+ te.printStackTrace();
+ fail();
+ }
}
private final NumberFormat format=NumberFormat.getNumberInstance();
@@ -59,6 +65,7 @@
long start=System.currentTimeMillis();
System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each");
long end=0;
+
if(serverRunning){
for(int i=0;i<num_packets;i++){
long block=System.currentTimeMillis();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|