[Udt-java-commits] SF.net SVN: udt-java:[53] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2011-01-06 16:13:38
|
Revision: 53
http://udt-java.svn.sourceforge.net/udt-java/?rev=53&view=rev
Author: bschuller
Date: 2011-01-06 16:13:32 +0000 (Thu, 06 Jan 2011)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/util/MeanValue.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -117,12 +117,17 @@
if(localPort>0)this.port = localPort;
else port=dgSocket.getLocalPort();
+ configureSocket();
+ }
+
+ protected void configureSocket()throws SocketException{
//set a time out to avoid blocking in doReceive()
dgSocket.setSoTimeout(100000);
//buffer size
dgSocket.setReceiveBufferSize(128*1024);
+ dgSocket.setReuseAddress(false);
}
-
+
/**
* bind to the default network interface on the machine
*
@@ -237,8 +242,6 @@
private long lastDestID=-1;
private UDTSession lastSession;
- //MeanValue v=new MeanValue("receiver processing ",true, 256);
-
private int n=0;
private final Object lock=new Object();
@@ -247,13 +250,10 @@
while(!stopped){
try{
try{
- //v.end();
//will block until a packet is received or timeout has expired
dgSocket.receive(dp);
- //v.begin();
-
Destination peer=new Destination(dp.getAddress(), dp.getPort());
int l=dp.getLength();
UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -93,14 +93,19 @@
/**
* sends the given data asynchronously
*
- * @param data
+ * @param data - the data to send
* @throws IOException
- * @throws InterruptedException
*/
- public void send(byte[]data)throws IOException, InterruptedException{
+ public void send(byte[]data)throws IOException{
clientSession.getSocket().doWrite(data);
}
+ /**
+ * sends the given data and waits for acknowledgement
+ * @param data - the data to send
+ * @throws IOException
+ * @throws InterruptedException if interrupted while waiting for ack
+ */
public void sendBlocking(byte[]data)throws IOException, InterruptedException{
clientSession.getSocket().doWriteBlocking(data);
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -183,13 +183,13 @@
private MeanValue dataProcessTime;
private void initMetrics(){
if(!storeStatistics)return;
- dgReceiveInterval=new MeanValue("UDT receive interval");
+ dgReceiveInterval=new MeanValue("RECEIVER: UDT receive interval");
statistics.addMetric(dgReceiveInterval);
- dataPacketInterval=new MeanValue("Data packet interval");
+ dataPacketInterval=new MeanValue("RECEIVER: Data packet interval");
statistics.addMetric(dataPacketInterval);
- processTime=new MeanValue("UDT packet process time");
+ processTime=new MeanValue("RECEIVER: UDT packet process time");
statistics.addMetric(processTime);
- dataProcessTime=new MeanValue("Data packet process time");
+ dataProcessTime=new MeanValue("RECEIVER: Data packet process time");
statistics.addMetric(dataProcessTime);
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -125,7 +125,7 @@
statistics=session.getStatistics();
senderLossList=new SenderLossList();
sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2);
- sendQueue = new ArrayBlockingQueue<DataPacket>(1000);
+ sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true);
lastAckSequenceNumber=session.getInitialSequenceNumber();
currentSequenceNumber=session.getInitialSequenceNumber()-1;
waitForAckLatch.set(new CountDownLatch(1));
@@ -140,11 +140,11 @@
private MeanThroughput throughput;
private void initMetrics(){
if(!storeStatistics)return;
- dgSendTime=new MeanValue("Datagram send time");
+ dgSendTime=new MeanValue("SENDER: Datagram send time");
statistics.addMetric(dgSendTime);
- dgSendInterval=new MeanValue("Datagram send interval");
+ dgSendInterval=new MeanValue("SENDER: Datagram send interval");
statistics.addMetric(dgSendInterval);
- throughput=new MeanThroughput("Throughput", session.getDatagramSize());
+ throughput=new MeanThroughput("SENDER: Throughput", session.getDatagramSize());
statistics.addMetric(throughput);
}
@@ -338,7 +338,7 @@
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
&& unAcknowledged<session.getFlowWindowSize()){
//check for application data
- DataPacket dp=sendQueue.poll(Util.SYN,TimeUnit.MICROSECONDS);
+ DataPacket dp=sendQueue.poll();
if(dp!=null){
send(dp);
largestSentSequenceNumber=dp.getPacketSequenceNumber();
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -33,6 +33,8 @@
package udt.sender;
import java.util.LinkedList;
+import udt.util.MeanValue;
+
/**
* stores the sequence number of the lost packets in increasing order
*/
@@ -49,15 +51,15 @@
public void insert(Long obj){
synchronized (backingList) {
- if(!backingList.contains(obj)){
- for(int i=0;i<backingList.size();i++){
- if(obj<backingList.get(i)){
- backingList.add(i,obj);
- return;
- }
+ for(int i=0;i<backingList.size();i++){
+ Long entry=backingList.get(i);
+ if(obj<entry){
+ backingList.add(i,obj);
+ return;
}
- backingList.add(obj);
+ else if(obj==entry)return;
}
+ backingList.add(obj);
}
}
@@ -69,7 +71,7 @@
return backingList.poll();
}
}
-
+
public boolean isEmpty(){
return backingList.isEmpty();
}
Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -9,6 +9,8 @@
public class MeanValue {
private double mean=0;
+ private double max=0;
+ private double min=0;
private int n=0;
@@ -43,9 +45,15 @@
public void addValue(double value){
mean=(mean*n+value)/(n+1);
n++;
+ max=Math.max(max, value);
+ min=Math.min(max, value);
+
if(verbose && n % nValue == 0){
- if(msg!=null)System.out.println(msg+" "+getFormattedMean());
- else System.out.println(name+getFormattedMean());
+ if(msg!=null)System.out.println(msg+" "+get());
+ else System.out.println(name+" "+get());
+
+ max=0;
+ min=0;
}
}
@@ -57,6 +65,10 @@
return format.format(getMean());
}
+ public String get(){
+ return format.format(getMean())+" max="+format.format(max)+" min="+format.format(min);
+ }
+
public void clear(){
mean=0;
n=0;
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -44,7 +44,6 @@
lock=new ReentrantLock(false);
notEmpty=lock.newCondition();
highestReadSequenceNumber=SequenceNumber.decrement(initialSequenceNumber);
- System.out.println("SIZE: "+size);
}
public boolean offer(AppData data){
@@ -121,14 +120,6 @@
}
else return null;
}
- // else{
- // System.out.println("empty HEAD at pos="+readPosition);
- // try{
- // Thread.sleep(1000);
- // Thread.yield();
- // }catch(InterruptedException e){};
- // }
-
return r;
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -104,6 +104,12 @@
}
long size=decode(sizeInfo, 0);
+ Boolean devNull=Boolean.getBoolean("udt.dev.null");
+ if(devNull){
+ while(true)Thread.sleep(10000);
+ }
+
+
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=200;
+ int num_packets=300;
//how large is a single packet
int size=1*1024*1024;
@@ -32,6 +32,8 @@
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
+// System.setProperty("udt.receiver.storeStatistics","true");
+// System.setProperty("udt.sender.storeStatistics","true");
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
doTest();
@@ -109,6 +111,10 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
+ Boolean devNull=Boolean.getBoolean("udt.dev.null");
+ if(devNull){
+ while(true)Thread.sleep(10000);
+ }
MessageDigest md5=MessageDigest.getInstance("MD5");
long start=System.currentTimeMillis();
UDTSocket s=serverSocket.accept();
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -23,8 +23,10 @@
public void test1()throws Exception{
runServer();
runThirdThread();
+
//client socket
DatagramSocket s=new DatagramSocket(12345);
+
//generate a test array with random content
N=num_packets*packetSize;
byte[]data=new byte[packetSize];
@@ -34,32 +36,29 @@
dp.setAddress(InetAddress.getByName("localhost"));
dp.setPort(65321);
System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes");
- MeanValue v=new MeanValue("Datagram send time",false);
- MeanValue v2=new MeanValue("Datagram send interval",false);
- MeanValue v3=new MeanValue("Encoding time",false);
+ MeanValue dgSendTime=new MeanValue("Datagram send time",false);
+ MeanValue dgSendInterval=new MeanValue("Datagram send interval",false);
for(int i=0;i<num_packets;i++){
DataPacket p=new DataPacket();
p.setData(data);
- v3.begin();
dp.setData(p.getEncoded());
- v3.end();
- v2.end();
- v.begin();
+ dgSendInterval.end();
+ dgSendTime.begin();
s.send(dp);
- v.end();
- v2.begin();
+ dgSendTime.end();
+ dgSendInterval.begin();
}
System.out.println("Finished sending.");
while(serverRunning)Thread.sleep(10);
System.out.println("Server stopped.");
long end=System.currentTimeMillis();
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
- System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec");
+ float rate=N/1000/(end-start);
+ System.out.println("Rate "+rate+" Mbytes/sec "+(rate*8)+ " Mbit/sec");
System.out.println("Rate "+num_packets+" packets/sec");
- System.out.println("Mean send time "+v.getFormattedMean()+" microsec");
- System.out.println("Mean send interval "+v2.getFormattedMean()+" microsec");
- System.out.println("Datapacket encoding time "+v3.getFormattedMean()+" microsec");
+ System.out.println("Mean send time "+dgSendTime.get());
+ System.out.println("Mean send interval "+dgSendInterval.get());
System.out.println("Server received: "+total);
}
@@ -79,6 +78,7 @@
while(true){
serverSocket.receive(dp);
handoff.offer(dp);
+ total+=dp.getLength();
}
}
catch(Exception e){
@@ -117,5 +117,5 @@
t.start();
}
-
+
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|