[Udt-java-commits] SF.net SVN: udt-java:[54] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2011-02-17 21:24:39
|
Revision: 54
http://udt-java.svn.sourceforge.net/udt-java/?rev=54&view=rev
Author: bschuller
Date: 2011-02-17 21:24:32 +0000 (Thu, 17 Feb 2011)
Log Message:
-----------
bit of refactoring of sender to avoid memory allocations for data and data packets
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/UDTSocket.java
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Added Paths:
-----------
udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
udt-java/trunk/src/test/java/udt/sender/
udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -168,6 +168,7 @@
}
};
Thread t=UDTThreadFactory.get().newThread(receive);
+ t.setName("UDPEndpoint-"+t.getName());
t.setDaemon(true);
t.start();
logger.info("UDTEndpoint started.");
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -214,6 +214,8 @@
}
};
receiverThread=UDTThreadFactory.get().newThread(r);
+ String s=(session instanceof ServerSession)? "ServerSession": "ClientSession";
+ receiverThread.setName("UDTReceiver-"+s+"-"+receiverThread.getName());
receiverThread.start();
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -33,9 +33,8 @@
package udt;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -49,6 +48,7 @@
import udt.packets.DataPacket;
import udt.packets.KeepAlive;
import udt.packets.NegativeAcknowledgement;
+import udt.sender.FlowWindow;
import udt.sender.SenderLossList;
import udt.util.MeanThroughput;
import udt.util.MeanValue;
@@ -76,13 +76,12 @@
//senderLossList stores the sequence numbers of lost packets
//fed back by the receiver through NAK pakets
private final SenderLossList senderLossList;
-
+
//sendBuffer stores the sent data packets and their sequence numbers
- private final Map<Long,DataPacket>sendBuffer;
-
- //sendQueue contains the packets to send
- private final BlockingQueue<DataPacket>sendQueue;
-
+ private final Map<Long,byte[]>sendBuffer;
+
+ private final FlowWindow flowWindow;
+
//thread reading packets from send queue and sending them
private Thread senderThread;
@@ -117,15 +116,18 @@
private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
private final boolean storeStatistics;
-
+
+ private final int chunksize;
+
public UDTSender(UDTSession session,UDPEndPoint endpoint){
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
this.endpoint= endpoint;
this.session=session;
statistics=session.getStatistics();
senderLossList=new SenderLossList();
- sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2);
- sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true);
+ sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2);
+ chunksize=session.getDatagramSize()-24;//need space for the header;
+ flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize);
lastAckSequenceNumber=session.getInitialSequenceNumber();
currentSequenceNumber=session.getInitialSequenceNumber()-1;
waitForAckLatch.set(new CountDownLatch(1));
@@ -179,16 +181,14 @@
}
};
senderThread=UDTThreadFactory.get().newThread(r);
+ String s=(session instanceof ServerSession)? "ServerSession": "ClientSession";
+ senderThread.setName("UDTSender-"+s+"-"+senderThread.getName());
senderThread.start();
}
/**
* sends the given data packet, storing the relevant information
- *
- * @param data
- * @throws IOException
- * @throws InterruptedException
*/
private void send(DataPacket p)throws IOException{
synchronized(sendLock){
@@ -203,28 +203,63 @@
throughput.end();
throughput.begin();
}
- sendBuffer.put(p.getPacketSequenceNumber(), p);
+ sendBuffer.put(p.getPacketSequenceNumber(), p.getData());
unacknowledged.incrementAndGet();
}
statistics.incNumberOfSentDataPackets();
}
+ protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{
+ if(!started)start();
+ DataPacket packet=null;
+ do{
+ packet=flowWindow.getForProducer();
+ if(packet==null){
+ Thread.sleep(10);
+ }
+ }while(packet==null);//TODO check timeout...
+ try{
+ packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setSession(session);
+ packet.setDestinationID(session.getDestination().getSocketID());
+ int len=Math.min(bb.remaining(),chunksize);
+ byte[] data=packet.getData();
+ bb.get(data,0,len);
+ packet.setLength(len);
+ }finally{
+ flowWindow.produce();
+ }
+
+ }
+
/**
- * writes a data packet into the sendQueue, waiting at most for the specified time
+ * writes a data packet, waiting at most for the specified time
* if this is not possible due to a full send queue
*
- * @return <code>true</code>if the packet was added, <code>false</code> if the
- * packet could not be added because the queue was full
- * @param p
* @param timeout
* @param units
* @return
* @throws IOException
* @throws InterruptedException
*/
- protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{
+ protected void sendUdtPacket(byte[]data, int timeout, TimeUnit units)throws IOException, InterruptedException{
if(!started)start();
- return sendQueue.offer(p,timeout,units);
+ DataPacket packet=null;
+ do{
+ packet=flowWindow.getForProducer();
+ if(packet==null){
+ Thread.sleep(10);
+ // System.out.println("queue full: "+flowWindow);
+ }
+ }while(packet==null);
+ try{
+ packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setSession(session);
+ packet.setDestinationID(session.getDestination().getSocketID());
+ packet.setData(data);
+ }finally{
+ flowWindow.produce();
+ }
}
//receive a packet from server from the peer
@@ -268,6 +303,7 @@
for(long s=lastAckSequenceNumber;s<ackNumber;s++){
synchronized (sendLock) {
removed=sendBuffer.remove(s)!=null;
+ senderLossList.remove(s);
}
if(removed){
unacknowledged.decrementAndGet();
@@ -291,7 +327,7 @@
session.getCongestionControl().onLoss(nak.getDecodedLossInfo());
session.getSocket().getReceiver().resetEXPTimer();
statistics.incNumberOfNAKReceived();
-
+
if(logger.isLoggable(Level.FINER)){
logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
+"set send period to "+session.getCongestionControl().getSendInterval());
@@ -322,13 +358,11 @@
public void senderAlgorithm()throws InterruptedException, IOException{
while(!paused){
iterationStart=Util.getCurrentTime();
-
//if the sender's loss list is not empty
- if (!senderLossList.isEmpty()) {
- Long entry=senderLossList.getFirstEntry();
- handleResubmit(entry);
+ Long entry=senderLossList.getFirstEntry();
+ if(entry!=null){
+ handleRetransmit(entry);
}
-
else
{
//if the number of unacknowledged data packets does not exceed the congestion
@@ -336,9 +370,9 @@
int unAcknowledged=unacknowledged.get();
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
- && unAcknowledged<session.getFlowWindowSize()){
+ && unAcknowledged<session.getFlowWindowSize()){
//check for application data
- DataPacket dp=sendQueue.poll();
+ DataPacket dp=flowWindow.consumeData();
if(dp!=null){
send(dp);
largestSentSequenceNumber=dp.getPacketSequenceNumber();
@@ -374,15 +408,21 @@
}
/**
- * re-submits an entry from the sender loss list
+ * re-transmit an entry from the sender loss list
* @param entry
*/
- protected void handleResubmit(Long seqNumber){
+ protected void handleRetransmit(Long seqNumber){
try {
//retransmit the packet and remove it from the list
- DataPacket pktToRetransmit = sendBuffer.get(seqNumber);
- if(pktToRetransmit!=null){
- endpoint.doSend(pktToRetransmit);
+ byte[]data=sendBuffer.get(seqNumber);
+ if(data!=null){
+ //System.out.println("re-transmit "+data);
+ DataPacket packet=new DataPacket();
+ packet.setPacketSequenceNumber(seqNumber);
+ packet.setSession(session);
+ packet.setDestinationID(session.getDestination().getSocketID());
+ packet.setData(data);
+ endpoint.doSend(packet);
statistics.incNumberOfRetransmittedDataPackets();
}
}catch (Exception e) {
@@ -457,14 +497,14 @@
*/
public void waitForAck()throws InterruptedException{
waitForAckLatch.set(new CountDownLatch(1));
- waitForAckLatch.get().await(2, TimeUnit.MILLISECONDS);
+ waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS);
}
public void stop(){
stopped=true;
}
-
+
public void pause(){
startLatch=new CountDownLatch(1);
paused=true;
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -74,7 +74,7 @@
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=1024;
+ protected int flowWindowSize=1024*10;
/**
* remote UDT entity (address and socket ID)
Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -36,46 +36,41 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
-
-import udt.packets.DataPacket;
-
/**
* UDTSocket is analogous to a normal java.net.Socket, it provides input and
* output streams for the application
*
* TODO is it possible to actually extend java.net.Socket ?
*
- *
*/
public class UDTSocket {
-
+
//endpoint
private final UDPEndPoint endpoint;
-
+
private volatile boolean active;
-
- //processing received data
+
+ //processing received data
private UDTReceiver receiver;
private UDTSender sender;
-
+
private final UDTSession session;
private UDTInputStream inputStream;
private UDTOutputStream outputStream;
-
/**
- * @param host
- * @param port
- * @param endpoint
- * @throws SocketException,UnknownHostException
- */
+ * @param host
+ * @param port
+ * @param endpoint
+ * @throws SocketException,UnknownHostException
+ */
public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
this.endpoint=endpoint;
this.session=session;
this.receiver=new UDTReceiver(session,endpoint);
this.sender=new UDTSender(session,endpoint);
}
-
+
public UDTReceiver getReceiver() {
return receiver;
}
@@ -114,7 +109,7 @@
}
return inputStream;
}
-
+
/**
* get the output stream for writing to this socket
* @return
@@ -125,20 +120,20 @@
}
return outputStream;
}
-
+
public final UDTSession getSession(){
return session;
}
-
+
/**
* write single block of data without waiting for any acknowledgement
* @param data
*/
protected void doWrite(byte[]data)throws IOException{
doWrite(data, 0, data.length);
-
+
}
-
+
/**
* write the given data
* @param data - the data array
@@ -148,14 +143,14 @@
*/
protected void doWrite(byte[]data, int offset, int length)throws IOException{
try{
- doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+ doWrite(data, offset, length, 10, TimeUnit.MILLISECONDS);
}catch(InterruptedException ie){
IOException io=new IOException();
io.initCause(ie);
throw io;
}
}
-
+
/**
* write the given data, waiting at most for the specified time if the queue is full
* @param data
@@ -167,26 +162,17 @@
* @throws InterruptedException
*/
protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{
- int chunksize=session.getDatagramSize()-24;//need some bytes for the header
ByteBuffer bb=ByteBuffer.wrap(data,offset,length);
- long seqNo=0;
while(bb.remaining()>0){
- int len=Math.min(bb.remaining(),chunksize);
- byte[]chunk=new byte[len];
- bb.get(chunk);
- DataPacket packet=new DataPacket();
- seqNo=sender.getNextSequenceNumber();
- packet.setPacketSequenceNumber(seqNo);
- packet.setSession(session);
- packet.setDestinationID(session.getDestination().getSocketID());
- packet.setData(chunk);
- //put the packet into the send queue
- if(!sender.sendUdtPacket(packet, timeout, units)){
- throw new IOException("Queue full");
+ try{
+ sender.sendUdtPacket(bb, timeout, units);
+ }catch(Exception ex){
+ ex.printStackTrace();
}
}
if(length>0)active=true;
}
+
/**
* will block until the outstanding packets have really been sent out
* and acknowledged
@@ -207,13 +193,13 @@
//TODO need to check if we can pause the sender...
//sender.pause();
}
-
+
//writes and wait for ack
protected void doWriteBlocking(byte[]data)throws IOException, InterruptedException{
doWrite(data);
flush();
}
-
+
/**
* close the connection
* @throws IOException
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -44,12 +44,15 @@
private long destinationID;
private UDTSession session;
+
+ private int dataLength;
public DataPacket(){
}
/**
- * create a DataPacket
+ * create a DataPacket from the given raw data
+ *
* @param encodedData - network data
*/
public DataPacket(byte[] encodedData){
@@ -58,6 +61,7 @@
public DataPacket(byte[] encodedData, int length){
decode(encodedData,length);
+ dataLength=length;
}
void decode(byte[]encodedData,int length){
@@ -75,16 +79,16 @@
}
public double getLength(){
- return data.length;
+ return dataLength;
}
- /*
- * aplivation data
- * @param
- */
-
+ public void setLength(int length){
+ dataLength=length;
+ }
+
public void setData(byte[] data) {
this.data = data;
+ dataLength=data.length;
}
public long getPacketSequenceNumber() {
@@ -125,12 +129,12 @@
*/
public byte[] getEncoded(){
//header.length is 16
- byte[] result=new byte[16+data.length];
+ byte[] result=new byte[16+dataLength];
System.arraycopy(PacketUtil.encode(packetSequenceNumber), 0, result, 0, 4);
System.arraycopy(PacketUtil.encode(messageNumber), 0, result, 4, 4);
System.arraycopy(PacketUtil.encode(timeStamp), 0, result, 8, 4);
System.arraycopy(PacketUtil.encode(destinationID), 0, result, 12, 4);
- System.arraycopy(data, 0, result, 16, data.length);
+ System.arraycopy(data, 0, result, 16, dataLength);
return result;
}
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -39,12 +39,17 @@
*/
public class PacketHistoryWindow extends CircularArray<Long>{
+ private final long[]intervals;
+ private final int num;
+
/**
* create a new PacketHistoryWindow of the given size
* @param size
*/
public PacketHistoryWindow(int size){
super(size);
+ num=max-1;
+ intervals=new long[num];
}
/**
@@ -54,12 +59,11 @@
*/
public long getPacketArrivalSpeed(){
if(!haveOverflow)return 0;
- int num=max-1;
+
double AI;
double medianPacketArrivalSpeed;
double total=0;
int count=0;
- long[]intervals=new long[num];
int pos=position-1;
if(pos<0)pos=num;
do{
Added: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java (rev 0)
+++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -0,0 +1,139 @@
+package udt.sender;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import udt.packets.DataPacket;
+
+/**
+ *
+ * holds a fixed number of {@link DataPacket} instances which are sent out.
+ *
+ * it is assumed that a single thread stores new data, and another single thread
+ * reads/removes data
+ *
+ * @author schuller
+ */
+public class FlowWindow {
+
+ private final DataPacket[]packets;
+
+ private final int length;
+
+ private volatile boolean isEmpty=true;
+
+ private volatile boolean isFull=false;
+
+ private volatile int validEntries=0;
+
+ private volatile boolean isCheckout=false;
+
+ private volatile int writePos=0;
+
+ private volatile int readPos=-1;
+
+ private volatile int consumed=0;
+
+ private volatile int produced=0;
+
+ private final ReentrantLock lock;
+
+ /**
+ * @param size - flow window size
+ * @param chunksize - data chunk size
+ */
+ public FlowWindow(int size, int chunksize){
+ this.length=size;
+ packets=new DataPacket[length];
+ for(int i=0;i<packets.length;i++){
+ packets[i]=new DataPacket();
+ packets[i].setData(new byte[chunksize]);
+ }
+ lock=new ReentrantLock(true);
+ }
+
+ /**
+ * get a data packet for updating with new data
+ *
+ * @return <code>null</code> if flow window is full
+ */
+ public DataPacket getForProducer(){
+ lock.lock();
+ try{
+ if(isFull){
+ return null;
+ }
+ if(isCheckout)throw new IllegalStateException();
+ isCheckout=true;
+ DataPacket p=packets[writePos];
+ return p;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+ public void produce(){
+ lock.lock();
+ try{
+ isCheckout=false;
+ writePos++;
+ if(writePos==length)writePos=0;
+ validEntries++;
+ isFull=validEntries==length-1;
+ isEmpty=false;
+ produced++;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+
+ public DataPacket consumeData(){
+ if(isEmpty){
+ return null;
+ }
+ lock.lock();
+ try{
+ readPos++;
+ DataPacket p=packets[readPos];
+ if(readPos==length-1)readPos=-1;
+ validEntries--;
+ isEmpty=validEntries==0;
+ isFull=false;
+ consumed++;
+ return p;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+ boolean isEmpty(){
+ return isEmpty;
+ }
+
+ /**
+ * check if another entry can be added
+ * @return
+ */
+ public boolean isFull(){
+ return isFull;
+ }
+
+ int readPos(){
+ return readPos;
+ }
+
+ int writePos(){
+ return writePos;
+ }
+
+ int consumed(){
+ return consumed;
+ }
+ public String toString(){
+ StringBuilder sb=new StringBuilder();
+ sb.append("FlowWindow size=").append(length);
+ sb.append(" full=").append(isFull).append(" empty=").append(isEmpty);
+ sb.append(" consumed=").append(consumed).append(" produced=").append(produced);
+ return sb.toString();
+ }
+}
Property changes on: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -33,8 +33,6 @@
package udt.sender;
import java.util.LinkedList;
-import udt.util.MeanValue;
-
/**
* stores the sequence number of the lost packets in increasing order
*/
@@ -57,14 +55,19 @@
backingList.add(i,obj);
return;
}
- else if(obj==entry)return;
+ else if(obj.equals(entry))return;
}
backingList.add(obj);
}
}
+ public void remove(Long obj){
+ synchronized (backingList) {
+ backingList.remove(obj);
+ }
+ }
/**
- * retrieves the loss list entry with the lowest sequence number
+ * retrieves the loss list entry with the lowest sequence number, or <code>null</code> if loss list is empty
*/
public Long getFirstEntry(){
synchronized(backingList){
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -109,7 +109,6 @@
while(true)Thread.sleep(10000);
}
-
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -188,7 +188,7 @@
sb.append("Duplicate data packets: ").append(getNumberOfDuplicateDataPackets()).append("\n");
sb.append("ACK received: ").append(getNumberOfACKReceived()).append("\n");
sb.append("NAK received: ").append(getNumberOfNAKReceived()).append("\n");
- sb.append("Retransmitted data: ").append(getNumberOfNAKReceived()).append("\n");
+ sb.append("Retransmitted data: ").append(getNumberOfRetransmittedDataPackets()).append("\n");
sb.append("NAK sent: ").append(getNumberOfNAKSent()).append("\n");
sb.append("ACK sent: ").append(getNumberOfACKSent()).append("\n");
if(roundTripTime>0){
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=300;
+ int num_packets=100;
//how large is a single packet
int size=1*1024*1024;
@@ -55,7 +55,7 @@
new Random().nextBytes(data);
MessageDigest digest=MessageDigest.getInstance("MD5");
- while(!serverRunning)Thread.sleep(100);
+ while(!serverStarted)Thread.sleep(100);
long start=System.currentTimeMillis();
System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each");
long end=0;
@@ -101,6 +101,7 @@
long total=0;
volatile boolean serverRunning=true;
+ volatile boolean serverStarted=false;
volatile String md5_received=null;
@@ -110,6 +111,7 @@
Runnable serverProcess=new Runnable(){
public void run(){
+
try{
Boolean devNull=Boolean.getBoolean("udt.dev.null");
if(devNull){
@@ -118,6 +120,7 @@
MessageDigest md5=MessageDigest.getInstance("MD5");
long start=System.currentTimeMillis();
UDTSocket s=serverSocket.accept();
+ serverStarted=true;
assertNotNull(s);
UDTInputStream is=s.getInputStream();
byte[]buf=new byte[READ_BUFFERSIZE];
Added: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
===================================================================
--- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java (rev 0)
+++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -0,0 +1,151 @@
+package udt.sender;
+
+import java.util.concurrent.TimeoutException;
+
+import junit.framework.TestCase;
+import udt.packets.DataPacket;
+
+public class TestFlowWindow extends TestCase {
+
+ public void testFillWindow()throws InterruptedException, TimeoutException{
+ FlowWindow fw=new FlowWindow(3, 128);
+ DataPacket p1=fw.getForProducer();
+ assertNotNull(p1);
+ fw.produce();
+ DataPacket p2=fw.getForProducer();
+ assertNotNull(p2);
+ fw.produce();
+ assertFalse(p1==p2);
+ DataPacket p3=fw.getForProducer();
+ assertNotNull(p3);
+ assertFalse(p1==p3);
+ assertFalse(p2==p3);
+ fw.produce();
+ assertTrue(fw.isFull());
+
+ DataPacket no=fw.getForProducer();
+ assertNull("Window should be full",no);
+
+ DataPacket c1=fw.consumeData();
+ //must be p1
+ assertTrue(c1==p1);
+ DataPacket c2=fw.consumeData();
+ //must be p2
+ assertTrue(c2==p2);
+ DataPacket c3=fw.consumeData();
+ //must be p3
+ assertTrue(c3==p3);
+ assertTrue(fw.isEmpty());
+ }
+
+ public void testOverflow()throws InterruptedException, TimeoutException{
+ FlowWindow fw=new FlowWindow(3, 64);
+ DataPacket p1=fw.getForProducer();
+ assertNotNull(p1);
+ fw.produce();
+ DataPacket p2=fw.getForProducer();
+ assertNotNull(p2);
+ fw.produce();
+ assertFalse(p1==p2);
+ DataPacket p3=fw.getForProducer();
+ assertNotNull(p3);
+ assertFalse(p1==p3);
+ assertFalse(p2==p3);
+ fw.produce();
+ assertTrue(fw.isFull());
+
+ //read one
+ DataPacket c1=fw.consumeData();
+ //must be p1
+ assertTrue(c1==p1);
+ assertFalse(fw.isFull());
+
+ //now a slot for writing should be free again
+ DataPacket p4=fw.getForProducer();
+ assertNotNull(p4);
+ fw.produce();
+ //which is again p1
+ assertTrue(p4==p1);
+
+ }
+
+ private volatile boolean fail=false;
+
+ public void testConcurrentReadWrite()throws InterruptedException{
+ final FlowWindow fw=new FlowWindow(20, 64);
+ Thread reader=new Thread(new Runnable(){
+ public void run(){
+ doRead(fw);
+ }
+ });
+ reader.setName("reader");
+ Thread writer=new Thread(new Runnable(){
+ public void run(){
+ doWrite(fw);
+ }
+ });
+ writer.setName("writer");
+
+ writer.start();
+ reader.start();
+
+ int c=0;
+ while(read && write && c<10){
+ Thread.sleep(1000);
+ c++;
+ }
+ assertFalse("An error occured in reader or writer",fail);
+
+ }
+
+ volatile boolean read=true;
+ volatile boolean write=true;
+ int N=100000;
+
+ private void doRead(final FlowWindow fw){
+ System.out.println("Starting reader...");
+ try{
+ for(int i=0;i<N;i++){
+ DataPacket p=null;
+ while( (p=fw.consumeData())==null){
+ Thread.sleep(1);
+ }
+ synchronized (p) {
+ assertEquals(i,p.getMessageNumber());
+ }
+ }
+ }catch(Throwable ex){
+ ex.printStackTrace();
+ fail=true;
+ }
+ System.out.println("Exiting reader...");
+ read=false;
+ }
+
+ private void doWrite(final FlowWindow fw){
+ System.out.println("Starting writer...");
+ DataPacket p=null;
+ try{
+ for(int i=0;i<N;i++){
+ p=null;
+ do{
+ p=fw.getForProducer();
+ if(p!=null){
+ synchronized(p){
+ p.setData(("test"+i).getBytes());
+ p.setMessageNumber(i);
+ fw.produce();
+ }
+ }
+ }while(p==null);
+ }
+ }catch(Exception ex){
+ ex.printStackTrace();
+ System.out.println("ERROR****");
+ fail=true;
+ }
+ System.out.println("Exiting writer...");
+ write=false;
+ }
+
+}
Property changes on: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|