[Udt-java-commits] SF.net SVN: udt-java:[24] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-04-25 20:32:29
|
Revision: 24
http://udt-java.svn.sourceforge.net/udt-java/?rev=24&view=rev
Author: bschuller
Date: 2010-04-25 20:32:22 +0000 (Sun, 25 Apr 2010)
Log Message:
-----------
simpler InputStream; fix wrong value for EXP interval
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
udt-java/trunk/src/test/java/udt/UDTTestBase.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Added Paths:
-----------
udt-java/trunk/src/main/java/udt/util/MeanValue.java
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -34,7 +34,6 @@
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,13 +65,10 @@
//see the noMoreData() method
private final AtomicBoolean expectMoreData=new AtomicBoolean(true);
-
- private final ByteBuffer buffer;
-
private volatile boolean closed=false;
-
+
private volatile boolean blocking=true;
-
+
/**
* create a new {@link UDTInputStream} connected to the given socket
* @param socket - the {@link UDTSocket}
@@ -83,8 +79,6 @@
this.socket=socket;
this.statistics=statistics;
appData=new FlowWindow<AppData>(getFlowWindowSize());
- buffer=ByteBuffer.allocate(65536);
- buffer.flip();
}
private int getFlowWindowSize(){
@@ -107,7 +101,7 @@
int b=0;
while(b==0)
b=read(single);
-
+
if(b>0){
return single[0];
}
@@ -115,24 +109,39 @@
return b;
}
}
-
+
private AppData currentChunk=null;
+ //offset into currentChunk
int offset=0;
+
@Override
public int read(byte[]target)throws IOException{
try{
- //empty the buffer first
- int read=readFromBuffer(target, 0);
- //if no more space left in target, exit now
- if(read==target.length){
- return target.length;
+ int read=0;
+ updateCurrentChunk();
+ while(currentChunk!=null){
+ byte[]data=currentChunk.data;
+ int length=Math.min(target.length-read,data.length-offset);
+ System.arraycopy(data, offset, target, read, length);
+ read+=length;
+ offset+=length;
+ //check if chunk has been fully read
+ if(offset>=data.length){
+ currentChunk=null;
+ offset=0;
+ }
+
+ //if no more space left in target, exit now
+ if(read==target.length){
+ return read;
+ }
+
+ updateCurrentChunk();
}
- //otherwise try to fill up the buffer
- fillBuffer();
- read+=readFromBuffer(target, read);
+
if(read>0)return read;
if(closed)return -1;
- if(expectMoreData.get() || buffer.remaining()>0 || !appData.isEmpty())return 0;
+ if(expectMoreData.get() || !appData.isEmpty())return 0;
//no more data
return -1;
@@ -143,20 +152,18 @@
}
}
- @Override
- public int available()throws IOException{
- return buffer.remaining();
- }
-
/**
- * write as much data into the ByteBuffer as possible<br/>
+ * Reads the next valid chunk of application data from the queue<br/>
+ *
* In blocking mode,this method will block until data is available or the socket is closed,
- * otherwise wait for at most 10 milliseconds.
- * @returns <code>true</code> if data available
+ * otherwise it will wait for at most 10 milliseconds.
+ *
* @throws InterruptedException
*/
- private boolean fillBuffer()throws IOException{
- if(currentChunk==null){
+ private void updateCurrentChunk()throws IOException{
+ if(currentChunk!=null)return;
+
+ while(true){
try{
if(blocking){
currentChunk=appData.poll(1, TimeUnit.MILLISECONDS);
@@ -170,51 +177,29 @@
ex.initCause(ie);
throw ex;
}
- }
- if(currentChunk!=null){
- //check if the data is in-order
- if(currentChunk.sequenceNumber==highestSequenceNumber+1){
- highestSequenceNumber++;
- statistics.updateReadDataMD5(currentChunk.data);
+ if(currentChunk!=null){
+ //check if the data is in-order
+ if(currentChunk.sequenceNumber==highestSequenceNumber+1){
+ highestSequenceNumber++;
+ statistics.updateReadDataMD5(currentChunk.data);
+ return;
+ }
+ else if(currentChunk.sequenceNumber<=highestSequenceNumber){
+ //duplicate, drop it
+ currentChunk=null;
+ statistics.incNumberOfDuplicateDataPackets();
+ }
+ else{
+ //out of order data, put back into queue and exit
+ appData.offer(currentChunk);
+ currentChunk=null;
+ return;
+ }
}
- else if(currentChunk.sequenceNumber<=highestSequenceNumber){
- //duplicate, drop it
- currentChunk=null;
- statistics.incNumberOfDuplicateDataPackets();
- return false;
- }
- else{
- //out of order data, put back into queue
- appData.offer(currentChunk);
- currentChunk=null;
- return false;
- }
-
- //fill data into the buffer
- buffer.compact();
- int len=Math.min(buffer.remaining(),currentChunk.data.length-offset);
- buffer.put(currentChunk.data,offset,len);
- buffer.flip();
- offset+=len;
- //check if the chunk has been fully read
- if(offset>=currentChunk.data.length){
- currentChunk=null;
- offset=0;
- }
+ else return;
}
- return true;
}
- //read data from the internal buffer into target at the specified offset
- private int readFromBuffer(byte[] target, int offset){
- int available=buffer.remaining();
- int canRead=Math.min(available, target.length-offset);
- if(canRead>0){
- buffer.get(target, offset, canRead);
- }
- return canRead;
- }
-
/**
* new application data
* @param data
@@ -231,7 +216,7 @@
closed=true;
noMoreData();
}
-
+
public UDTSocket getSocket(){
return socket;
}
@@ -243,7 +228,7 @@
public void setBlocking(boolean block){
this.blocking=block;
}
-
+
/**
* notify the input stream that there is no more data
* @throws IOException
@@ -277,7 +262,7 @@
final int prime = 31;
int result = 1;
result = prime * result
- + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ + (int) (sequenceNumber ^ (sequenceNumber >>> 32));
return result;
}
@@ -294,8 +279,8 @@
return false;
return true;
}
-
-
+
+
}
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -125,7 +125,7 @@
private long nextEXP;
//microseconds to next EXP event
- private long EXP_INTERVAL=Util.getSYNTime();
+ private long EXP_INTERVAL=100*Util.getSYNTime();
//instant when the session was created (for expiry checking)
private final long sessionUpSince;
@@ -348,7 +348,7 @@
public static int dropRate=0;
//number of received data packets
private int n=0;
-
+
protected void onDataPacketReceived(DataPacket dp)throws IOException{
long currentSequenceNumber = dp.getPacketSequenceNumber();
//check whether to drop this packet
@@ -367,9 +367,11 @@
long interval=currentDataPacketArrivalTime -lastDataPacketArrivalTime;
packetPairWindow.add(interval);
}
+
//(5).record the packet arrival time in the PKT History Window.
packetHistoryWindow.add(currentDataPacketArrivalTime);
+
//store current time
lastDataPacketArrivalTime=currentDataPacketArrivalTime;
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -286,14 +286,12 @@
SenderLossListEntry entry=senderLossList.getFirstEntry();
if (entry!=null) {
long seqNumber = entry.getSequenceNumber();
-
//TODO
//if the current seqNumber is 16n,check the timeOut in the
//loss list and send a message drop request.
//if((seqNumber%16)==0){
//sendLossList.checkTimeOut(timeToLive);
//}
-
try {
//retransmit the packet with the first entry in the list
//as sequence number and remove it from the list
@@ -301,8 +299,7 @@
if(pktToRetransmit!=null){
endpoint.doSend(pktToRetransmit);
statistics.incNumberOfRetransmittedDataPackets();
- }
- senderLossList.remove(seqNumber);
+ }
}catch (Exception e) {
logger.log(Level.WARNING,"",e);
}
@@ -352,6 +349,7 @@
* for processing EXP event (see spec. p 13)
*/
protected void putUnacknowledgedPacketsIntoLossList(){
+
synchronized (sendLock) {
for(Long l: sendBuffer.keySet()){
senderLossList.insert(new SenderLossListEntry(l));
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -58,7 +58,8 @@
}
/**
- * gets the loss list entry with the lowest sequence number
+ * retrieves the loss list entry with the lowest sequence number and removes
+ * it from the loss list
*/
public SenderLossListEntry getFirstEntry(){
return backingList.poll();
@@ -68,4 +69,7 @@
return backingList.isEmpty();
}
+ public long size(){
+ return backingList.size();
+ }
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -94,4 +94,7 @@
return true;
}
+ public String toString(){
+ return "lossListEntry-"+sequenceNumber;
+ }
}
Added: udt-java/trunk/src/main/java/udt/util/MeanValue.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/MeanValue.java (rev 0)
+++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -0,0 +1,38 @@
+package udt.util;
+
+import java.text.NumberFormat;
+
+/**
+ * holds a floating mean value
+ */
+public class MeanValue {
+
+ private double mean=0;
+
+ private int n=0;
+
+ private final NumberFormat format;
+
+
+ public MeanValue(){
+ format=NumberFormat.getNumberInstance();
+ format.setMaximumFractionDigits(2);
+ }
+ public void addValue(double value){
+ mean=(mean*n+value)/(n+1);
+ n++;
+ }
+
+ public double getMean(){
+ return mean;
+ }
+
+ public String getFormattedMean(){
+ return format.format(mean);
+ }
+
+ public void clear(){
+ mean=0;
+ n=0;
+ }
+}
Property changes on: udt-java/trunk/src/main/java/udt/util/MeanValue.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -150,6 +150,10 @@
this.sendPeriod=sendPeriod;
}
+ public double getSendPeriod(){
+ return sendPeriod;
+ }
+
public void updateReadDataMD5(byte[]data){
digest.update(data);
}
Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -13,30 +13,46 @@
byte[] data2="a test".getBytes();
byte[] data3=" string".getBytes();
String digest=computeMD5(data1,data2,data3);
- is.haveNewData(0, data1);
- is.haveNewData(1, data2);
- is.haveNewData(2, data3);
+ is.haveNewData(1, data1);
+ is.haveNewData(2, data2);
+ is.haveNewData(3, data3);
is.noMoreData();
is.setBlocking(false);
readAll(is,8);
assertEquals(digest,stat.getDigest());
}
+ public void test2()throws Exception{
+ UDTStatistics stat=new UDTStatistics("test");
+ UDTInputStream is=new UDTInputStream(null, stat);
+ byte[] data1=getRandomData(65537);
+ byte[] data2=getRandomData(1234);
+ byte[] data3=getRandomData(3*1024*1024);
+ String digest=computeMD5(data1,data2,data3);
+ is.setBlocking(false);
+ is.haveNewData(1, data1);
+ is.haveNewData(2, data2);
+ is.haveNewData(3, data3);
+ is.noMoreData();
+ readAll(is,5*1024*1024);
+ assertEquals(digest,stat.getDigest());
+ }
+
public void testInOrder()throws Exception{
UDTStatistics stat=new UDTStatistics("test");
UDTInputStream is=new UDTInputStream(null, stat);
is.setBlocking(false);
- byte[]data=getRandomData(10*1024);
+ byte[]data=getRandomData(10*1024*1024);
byte[][]blocks=makeChunks(10,data);
String digest=computeMD5(blocks);
for(int i=0;i<10;i++){
- is.haveNewData(i, blocks[i]);
+ is.haveNewData(i+1, blocks[i]);
}
is.noMoreData();
- readAll(is,512);
+ readAll(is,1024*999);
assertEquals(digest,stat.getDigest());
}
@@ -52,7 +68,7 @@
byte[]order=new byte[]{9,7,5,3,1,2,0,4,6,8};
for(int i : order){
- is.haveNewData(i, blocks[i]);
+ is.haveNewData(i+1, blocks[i]);
}
readAll(is,512,true);
@@ -70,7 +86,7 @@
is.noMoreData();
if(c==-1)break;
else{
- d.update(buf,0,c);
+ if(c>0)d.update(buf,0,c);
}
}
return UDTStatistics.hexString(d);
Modified: udt-java/trunk/src/test/java/udt/UDTTestBase.java
===================================================================
--- udt-java/trunk/src/test/java/udt/UDTTestBase.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/test/java/udt/UDTTestBase.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -5,6 +5,8 @@
import java.security.MessageDigest;
import java.util.Random;
+import udt.util.UDTStatistics;
+
import junit.framework.TestCase;
/**
@@ -58,14 +60,7 @@
}
public static String hexString(MessageDigest digest){
- byte[] messageDigest = digest.digest();
- StringBuilder hexString = new StringBuilder();
- for (int i=0;i<messageDigest.length;i++) {
- String hex = Integer.toHexString(0xFF & messageDigest[i]);
- if(hex.length()==1)hexString.append('0');
- hexString.append(hex);
- }
- return hexString.toString();
+ return UDTStatistics.hexString(digest);
}
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-22 10:06:54 UTC (rev 23)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-25 20:32:22 UTC (rev 24)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=200;
+ int num_packets=100;
//how large is a single packet
int size=1*1024*1024;
@@ -64,7 +64,12 @@
client.sendBlocking(data);
digest.update(data);
double took=System.currentTimeMillis()-block;
- System.out.println("Sent block <"+i+"> in "+took+" ms, rate: "+format.format(size/(1024*took))+ " Mbytes/sec");
+ double arrival=client.getStatistics().getPacketArrivalRate();
+ double snd=client.getStatistics().getSendPeriod();
+ System.out.println("Sent block <"+i+"> in "+took+" ms, "
+ +" pktArr: "+arrival
+ + " snd: "+format.format(snd)
+ +" rate: "+format.format(size/(1024*took))+ " MB/sec");
}
end=System.currentTimeMillis();
client.shutdown();
@@ -74,7 +79,7 @@
while(serverRunning)Thread.sleep(100);
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
- double mbytes=N/(end-start)/1024;
+ double mbytes=N/(end-start)/1024.0;
double mbit=8*mbytes;
System.out.println("Rate: "+format.format(mbytes)+" Mbytes/sec "+format.format(mbit)+" Mbit/sec");
System.out.println("Server received: "+total);
@@ -117,7 +122,6 @@
else{
md5.update(buf, 0, c);
total+=c;
- Thread.yield();
}
}
System.out.println("Server thread exiting, last received bytes: "+c);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|