[Udt-java-commits] SF.net SVN: udt-java:[18] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-04-16 21:05:20
|
Revision: 18
http://udt-java.svn.sourceforge.net/udt-java/?rev=18&view=rev
Author: bschuller
Date: 2010-04-16 21:05:13 +0000 (Fri, 16 Apr 2010)
Log Message:
-----------
cc made configurable; tried to optimize loss lists
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/UDTSocket.java
udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java
udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java
udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
udt-java/trunk/src/main/java/udt/util/CircularArray.java
udt-java/trunk/src/main/java/udt/util/FlowWindow.java
udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
Added Paths:
-----------
udt-java/trunk/src/main/java/udt/CongestionControl.java
udt-java/trunk/src/test/java/udt/NullCongestionControl.java
Added: udt-java/trunk/src/main/java/udt/CongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/CongestionControl.java (rev 0)
+++ udt-java/trunk/src/main/java/udt/CongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -0,0 +1,74 @@
+package udt;
+
+import java.util.List;
+
+/**
+ * congestion control interface
+ */
+public interface CongestionControl {
+
+ /**
+ * Callback function to be called (only) at the start of a UDT connection.
+ * when the UDT socket is conected
+ */
+ public abstract void init();
+
+ /**
+ * set roundtrip time and associated variance
+ */
+ public abstract void setRTT(long rtt, long rttVar);
+
+ /**
+ * set packet arrival rate and link capacity
+ * @param rate
+ * @param linkCapacity
+ */
+ public abstract void setPacketArrivalRate(long rate, long linkCapacity);
+
+ /**
+ * Inter-packet interval in seconds
+ * @return
+ */
+ public abstract double getSendInterval();
+
+ /**
+ * get the congestion window size
+ */
+ public abstract long getCongestionWindowSize();
+
+ /**
+ * Callback function to be called when an ACK packet is received.
+ * @param ackSeqno: the data sequence number acknowledged by this ACK.
+ * see spec. page(16-17)
+ */
+ public abstract void onACK(long ackSeqno);
+
+ /**
+ * Callback function to be called when a loss report is received.
+ * @param lossInfo:list of sequence number of packets, in the format describled in packet.cpp.
+ */
+ public abstract void onNAK(List<Integer> lossInfo);
+
+ /**
+ * Callback function to be called when a timeout event occurs
+ */
+ public abstract void onTimeout();
+
+ /**
+ * Callback function to be called when a data is sent.
+ * @param packetSeqNo: the data sequence number.
+ */
+ public abstract void onPacketSend(long packetSeqNo);
+
+ /**
+ * Callback function to be called when a data is received.
+ * @param packetSeqNo: the data sequence number.
+ */
+ public abstract void onPacketReceive(long packetSeqNo);
+
+ /**
+ * Callback function to be called when a UDT connection is closed.
+ */
+ public abstract void close();
+
+}
\ No newline at end of file
Property changes on: udt-java/trunk/src/main/java/udt/CongestionControl.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -55,8 +55,6 @@
/**
* the UDPEndpoint takes care of sending and receiving UDP network packets,
* dispatching them to the correct {@link UDTSession}
- *
- *
*/
public class UDPEndPoint {
@@ -83,7 +81,7 @@
//has the endpoint been stopped?
private volatile boolean stopped=false;
- public static final int DATAGRAM_SIZE=32768;
+ public static final int DATAGRAM_SIZE=1500;
/**
* bind to any local port on the given host address
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -2,12 +2,10 @@
import java.util.List;
-import javax.swing.text.Utilities;
-
import udt.util.UDTStatistics;
import udt.util.Util;
-public class UDTCongestionControl {
+public class UDTCongestionControl implements CongestionControl {
private final UDTSession session;
private final UDTStatistics statistics;
@@ -64,26 +62,30 @@
init();
}
- /**
- * Callback function to be called (only) at the start of a UDT connection.
- * when the UDT socket is conected
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#init()
*/
public void init() {
}
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#setRTT(long, long)
+ */
public void setRTT(long rtt, long rttVar){
this.roundTripTime=rtt;
}
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#setPacketArrivalRate(long, long)
+ */
public void setPacketArrivalRate(long rate, long linkCapacity){
this.packetArrivalRate=rate;
this.estimatedLinkCapacity=linkCapacity;
}
- /**
- * Inter-packet interval in seconds
- * @return
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#getSendInterval()
*/
public double getSendInterval(){
return packetSendingPeriod ;
@@ -93,14 +95,12 @@
* congestionWindowSize
* @return
*/
- protected long getCongestionWindowSize(){
- return congestionWindowSize;
+ public long getCongestionWindowSize(){
+ return 2048;//congestionWindowSize;
}
- /**
- * Callback function to be called when an ACK packet is received.
- * @param ackSeqno: the data sequence number acknowledged by this ACK.
- * see spec. page(16-17)
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onACK(long)
*/
public void onACK(long ackSeqno){
//the fixed size of a UDT packet
@@ -143,9 +143,8 @@
return inc;
}
- /**
- * Callback function to be called when a loss report is received.
- * @param lossInfo:list of sequence number of packets, in the format describled in packet.cpp.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onNAK(java.util.List)
*/
public void onNAK(List<Integer>lossInfo){
long firstBiggestlossSeqNo=lossInfo.get(lossInfo.size()-1);
@@ -197,24 +196,22 @@
}
}
- /**
- * Callback function to be called when a timeout event occurs
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onTimeout()
*/
public void onTimeout(){}
- /**
- * Callback function to be called when a data is sent.
- * @param packetSeqNo: the data sequence number.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onPacketSend(long)
*/
public void onPacketSend(long packetSeqNo){}
- /**
- * Callback function to be called when a data is received.
- * @param packetSeqNo: the data sequence number.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#onPacketReceive(long)
*/
public void onPacketReceive(long packetSeqNo){}
- /**
- * Callback function to be called when a UDT connection is closed.
+ /* (non-Javadoc)
+ * @see udt.CongestionControl#close()
*/
public void close(){}
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -60,7 +60,7 @@
private final UDTStatistics statistics;
//the highest sequence number read by the application
- private long highestSequenceNumber=-1;
+ private volatile long highestSequenceNumber=0;
//set to 'false' by the receiver when it gets a shutdown signal from the peer
//see the noMoreData() method
@@ -88,8 +88,8 @@
}
private int getFlowWindowSize(){
- if(socket!=null)return socket.getSession().getFlowWindowSize();
- else return 64;
+ if(socket!=null)return 2*socket.getSession().getFlowWindowSize();
+ else return 128;
}
/**
* create a new {@link UDTInputStream} connected to the given socket
@@ -221,6 +221,7 @@
*
*/
protected boolean haveNewData(long sequenceNumber,byte[]data)throws IOException{
+ if(sequenceNumber<=highestSequenceNumber)return true;
return appData.offer(new AppData(sequenceNumber,data));
}
@@ -270,6 +271,31 @@
public String toString(){
return sequenceNumber+"["+data.length+"]";
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AppData other = (AppData) obj;
+ if (sequenceNumber != other.sequenceNumber)
+ return false;
+ return true;
+ }
+
+
}
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -84,7 +84,7 @@
private long lastDataPacketArrivalTime=0;
//largest received data packet sequence number(LRSN)
- private volatile long largestReceivedSeqNumber=-1;
+ private volatile long largestReceivedSeqNumber=0;
//ACK event related
@@ -233,7 +233,6 @@
}
processUDTPacket(packet);
}
- //else System.out.println("no packet.");
Thread.yield();
}
@@ -337,7 +336,6 @@
protected void onDataPacketReceived(DataPacket dp)throws IOException{
long currentSequenceNumber = dp.getPacketSequenceNumber();
-
//check whether to drop this packet
n++;
if(dropRate>0 && n % dropRate == 0){
@@ -412,6 +410,7 @@
protected void sendNAK(List<Long>sequenceNumbers)throws IOException{
if(sequenceNumbers.size()==0)return;
NegativeAcknowledgement nAckPacket= new NegativeAcknowledgement();
+
nAckPacket.addLossInfo(sequenceNumbers);
nAckPacket.setSession(session);
nAckPacket.setDestinationID(session.getDestination().getSocketID());
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -40,6 +40,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -87,7 +88,7 @@
private final AtomicInteger unacknowledged=new AtomicInteger(0);
//for generating data packet sequence numbers
- private long nextSequenceNumber=-1;
+ private long nextSequenceNumber=0;
//the largest data packet sequence number that has actually been sent out
private volatile long largestSentSequenceNumber=-1;
@@ -100,6 +101,8 @@
private volatile boolean stopped=false;
+ private volatile AtomicReference<CountDownLatch> latchRef=new AtomicReference<CountDownLatch>();
+
public UDTSender(UDTSession session,UDPEndPoint endpoint){
this.endpoint= endpoint;
this.session=session;
@@ -110,6 +113,8 @@
sendBuffer=new ConcurrentHashMap<Long, DataPacket>(MAX_SIZE,0.75f,2);
sendQueue = new LinkedBlockingQueue<DataPacket>(MAX_SIZE);
lastAckSequenceNumber=session.getInitialSequenceNumber();
+
+ latchRef.set(new CountDownLatch(1));
start();
}
@@ -190,8 +195,8 @@
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
- if(latch!=null)latch.countDown();
- UDTCongestionControl cc=session.getCongestionControl();
+ latchRef.get().countDown();
+ CongestionControl cc=session.getCongestionControl();
if(acknowledgement.getPacketReceiveRate()>0){
long rtt=acknowledgement.getRoundTripTime();
long rttVar=acknowledgement.getRoundTripTimeVar();
@@ -230,8 +235,6 @@
for(Integer i: nak.getDecodedLossInfo()){
senderLossList.insert(new SenderLossListEntry(i));
}
- //update SND TODO
-
session.getCongestionControl().onNAK(nak.getDecodedLossInfo());
//reset EXP. EXP is in the receiver currently.... maybe move to SOCKET?
session.getSocket().getReceiver().resetEXPTimer();
@@ -297,7 +300,7 @@
statistics.incNumberOfCCSlowDownEvents();
return;
}
- DataPacket dp=sendQueue.poll(10,TimeUnit.MILLISECONDS);
+ DataPacket dp=sendQueue.poll(100,TimeUnit.MILLISECONDS);
if(dp!=null){
lastSentTime=Util.getCurrentTime();
send(dp);
@@ -309,8 +312,9 @@
statistics.incNumberOfCCWindowExceededEvents();
}
}
+ Thread.yield();
}
- Thread.yield();
+
}
/**
@@ -363,16 +367,14 @@
return senderLossList.isEmpty();
}
- private volatile CountDownLatch latch=null;
-
/**
* wait for the next acknowledge
* @throws InterruptedException
*/
- public void waitForAck(long sequenceNumber)throws InterruptedException{
- latch=new CountDownLatch(1);
+ public synchronized void waitForAck(long sequenceNumber)throws InterruptedException{
while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
- latch.await(10, TimeUnit.MILLISECONDS);
+ latchRef.set(new CountDownLatch(1));
+ latchRef.get().await(10, TimeUnit.MILLISECONDS);
}
}
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -62,13 +62,13 @@
protected int receiveBufferSize=64*32768;
- protected final UDTCongestionControl cc;
+ protected final CongestionControl cc;
/**
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=32;
+ protected int flowWindowSize=64;
/**
* remote UDT entity (address and socket ID)
@@ -84,6 +84,12 @@
public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE;
/**
+ * key for a system property defining the CC class to be used
+ * @see CongestionControl
+ */
+ public static final String CC_CLASS="udt.congestioncontrol.class";
+
+ /**
* Buffer size (i.e. datagram size)
* This is negotiated during connection setup
*/
@@ -99,7 +105,18 @@
statistics=new UDTStatistics(description);
mySocketID=nextSocketID.incrementAndGet();
this.destination=destination;
- cc=new UDTCongestionControl(this);
+
+ //init configurable CC
+ String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName());
+ Object ccObject=null;
+ try{
+ Class<?>clazz=Class.forName(clazzP);
+ ccObject=clazz.getDeclaredConstructor(UDTSession.class).newInstance(this);
+ }catch(Exception e){
+ ccObject=new UDTCongestionControl(this);
+ }
+ cc=(CongestionControl)ccObject;
+ System.out.println("using "+cc.getClass().getName());
}
public abstract void received(UDTPacket packet, Destination peer);
@@ -108,7 +125,7 @@
return socket;
}
- public UDTCongestionControl getCongestionControl() {
+ public CongestionControl getCongestionControl() {
return cc;
}
Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -182,7 +182,7 @@
packet.setData(chunk);
//put the packet into the send queue
while(!sender.sendUdtPacket(packet, timeout, units)){
- System.out.println("WAIT");
+ System.out.println("SOCKET WAIT");
}
}
if(length>0)active=true;
Modified: udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -109,7 +109,7 @@
*/
public void addLossInfo(long firstSequenceNumber, long lastSequenceNumber) {
//check if we really need an interval
- if(lastSequenceNumber-firstSequenceNumber==1){
+ if(lastSequenceNumber-firstSequenceNumber==0){
addLossInfo(firstSequenceNumber);
return;
}
Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossList.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -32,7 +32,7 @@
package udt.receiver;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
@@ -50,25 +50,18 @@
public ReceiverLossList(){
backingList = new PriorityBlockingQueue<ReceiverLossListEntry>(16);
- }
+ }
public void insert(ReceiverLossListEntry entry){
- backingList.add(entry);
+ synchronized (backingList) {
+ if(!backingList.contains(entry)){
+ backingList.add(entry);
+ }
+ }
}
-
- public void remove(ReceiverLossListEntry obj){
- backingList.remove(obj);
- }
public void remove(long seqNo){
- Iterator<ReceiverLossListEntry>iterator=backingList.iterator();
- while(iterator.hasNext()){
- ReceiverLossListEntry e=iterator.next();
- if(e.getSequenceNumber()==seqNo){
- iterator.remove();
- break;
- }
- }
+ backingList.remove(new ReceiverLossListEntry(seqNo));
}
public boolean contains(ReceiverLossListEntry obj){
@@ -102,8 +95,10 @@
public List<Long>getFilteredSequenceNumbers(long RTT, boolean doFeedback){
List<Long>result=new ArrayList<Long>();
long now=Util.getCurrentTime();
- for(ReceiverLossListEntry e: backingList){
- if( (now-e.getLastFeedbackTime())>2*RTT){
+ ReceiverLossListEntry[]sorted=backingList.toArray(new ReceiverLossListEntry[0]);
+ Arrays.sort(sorted);
+ for(ReceiverLossListEntry e: sorted){
+ if( (now-e.getLastFeedbackTime())>e.getK()*RTT){
result.add(e.getSequenceNumber());
if(doFeedback)e.feedback();
}
Modified: udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/receiver/ReceiverLossListEntry.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -48,7 +48,9 @@
* @param sequenceNumber
*/
public ReceiverLossListEntry(long sequenceNumber){
- if(sequenceNumber<=0)throw new IllegalArgumentException();
+ if(sequenceNumber<=0){
+ throw new IllegalArgumentException("Got sequence number "+sequenceNumber);
+ }
this.sequenceNumber = sequenceNumber;
this.lastFeedbacktime=Util.getCurrentTime();
}
@@ -90,4 +92,30 @@
return sequenceNumber+"[k="+k+",time="+lastFeedbacktime+"]";
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (k ^ (k >>> 32));
+ result = prime * result
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ReceiverLossListEntry other = (ReceiverLossListEntry) obj;
+ if (sequenceNumber != other.sequenceNumber)
+ return false;
+ return true;
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -49,7 +49,9 @@
}
public void insert(SenderLossListEntry obj){
- backingList.add(obj);
+ synchronized (backingList) {
+ if(!backingList.contains(obj))backingList.add(obj);
+ }
}
public void remove(long seqNo){
@@ -61,6 +63,7 @@
return;
}
}
+ //backingList.remove(new SenderLossListEntry(seqNo));
}
/**
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -71,4 +71,27 @@
return (int)(sequenceNumber-o.sequenceNumber);
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SenderLossListEntry other = (SenderLossListEntry) obj;
+ if (sequenceNumber != other.sequenceNumber)
+ return false;
+ return true;
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/util/CircularArray.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/CircularArray.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/util/CircularArray.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -35,18 +35,23 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ * Circular array: the most recent value overwrites the oldest one if there is no more free
+ * space in the array
+ */
public class CircularArray<T>{
protected int position=0;
+
protected boolean haveOverflow=false;
- //the maximum number of entries
- protected int max=1;
- protected List<T>circularArray;
+ protected final int max;
+ protected final List<T>circularArray;
+
/**
- * ArrayList von T(object's type). The most recent value overwrite the oldest one
- * if no more free space in the array
+ * Create a new circularArray of the given size
+ *
* @param size
*/
public CircularArray(int size){
@@ -55,10 +60,7 @@
}
/**
- * Insert the specified entry at the specified position in this list.
- * the most recent value overwrite the oldest one
- * if no more free space in the circularArray
- * @param entry
+ * add an entry
*/
public void add(T entry){
if(position>=max){
@@ -74,7 +76,6 @@
/**
* Returns the number of elements in this list
- * @return
*/
public int size(){
return circularArray.size();
@@ -83,7 +84,5 @@
public String toString(){
return circularArray.toString();
}
-
-
-
+
}
Modified: udt-java/trunk/src/main/java/udt/util/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -74,6 +74,9 @@
*/
@Override
public boolean offer(E e) {
+ if(contains(e)){
+ return true;
+ }
if(size()<capacity){
return super.offer(e);
}else return false;
Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -33,7 +33,6 @@
package udt.util;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.security.MessageDigest;
Added: udt-java/trunk/src/test/java/udt/NullCongestionControl.java
===================================================================
--- udt-java/trunk/src/test/java/udt/NullCongestionControl.java (rev 0)
+++ udt-java/trunk/src/test/java/udt/NullCongestionControl.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -0,0 +1,48 @@
+package udt;
+
+import java.util.List;
+
+public class NullCongestionControl implements CongestionControl {
+
+ private final UDTSession session;
+
+ public NullCongestionControl(UDTSession session){
+ this.session=session;
+ }
+
+ public void close() {
+ }
+
+ public long getCongestionWindowSize() {
+ return Long.MAX_VALUE;
+ }
+
+ public double getSendInterval() {
+ return 0;
+ }
+
+ public void init() {
+ }
+
+ public void onACK(long ackSeqno) {
+ }
+
+ public void onNAK(List<Integer> lossInfo) {
+ }
+
+ public void onPacketReceive(long packetSeqNo) {
+ }
+
+ public void onPacketSend(long packetSeqNo) {
+ }
+
+ public void onTimeout() {
+ }
+
+ public void setPacketArrivalRate(long rate, long linkCapacity) {
+ }
+
+ public void setRTT(long rtt, long rttVar) {
+ }
+
+}
Property changes on: udt-java/trunk/src/test/java/udt/NullCongestionControl.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -7,10 +7,12 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import udt.NullCongestionControl;
import udt.UDTClient;
import udt.UDTInputStream;
import udt.UDTReceiver;
import udt.UDTServerSocket;
+import udt.UDTSession;
import udt.UDTSocket;
import udt.UDTTestBase;
import udt.util.UDTStatistics;
@@ -20,7 +22,8 @@
boolean running=false;
//how many
- int num_packets=200;
+ int num_packets=100;
+
//how large is a single packet
int size=1*1024*1024;
@@ -30,6 +33,7 @@
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
+ System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 13:15:20 UTC (rev 17)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-16 21:05:13 UTC (rev 18)
@@ -3,20 +3,17 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
-import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.TestCase;
/**
* send some data over a UDP connection and measure performance
- *
*/
public class UDPTest extends TestCase {
- final int BUFSIZE=32768;
- final int num_packets=10*1000;
- final int packetSize=1024;
+ final int num_packets=100*1000;
+ final int packetSize=1500;
public void test1()throws Exception{
runServer();
@@ -25,20 +22,15 @@
//generate a test array with random content
N=num_packets*packetSize;
- byte[]data=new byte[N];
+ byte[]data=new byte[packetSize];
new Random().nextBytes(data);
long start=System.currentTimeMillis();
- ByteBuffer bb=ByteBuffer.wrap(data);
- DatagramPacket dp=new DatagramPacket(new byte[BUFSIZE],BUFSIZE);
+ DatagramPacket dp=new DatagramPacket(new byte[packetSize],packetSize);
dp.setAddress(InetAddress.getByName("localhost"));
dp.setPort(65321);
-
- System.out.println("Sending data block of <"+N+"> bytes");
- while(bb.remaining()>0){
- int len=Math.min(bb.remaining(),BUFSIZE);
- byte[]chunk=new byte[len];
- bb.get(chunk);
- dp.setData(chunk);
+ System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes");
+ for(int i=0;i<num_packets;i++){
+ dp.setData(data);
s.send(dp);
}
System.out.println("Finished sending.");
@@ -46,7 +38,8 @@
System.out.println("Server stopped.");
long end=System.currentTimeMillis();
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
- System.out.println("Rate "+N/(end-start)+" Kbytes/sec");
+ System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec");
+ System.out.println("Rate "+num_packets+" packets/sec");
System.out.println("Server received: "+total);
}
@@ -61,7 +54,7 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
- byte[]buf=new byte[BUFSIZE];
+ byte[]buf=new byte[packetSize];
DatagramPacket dp=new DatagramPacket(buf,buf.length);
while(true){
serverSocket.receive(dp);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|