[Udt-java-commits] SF.net SVN: udt-java:[49] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-09-15 19:22:25
|
Revision: 49
http://udt-java.svn.sourceforge.net/udt-java/?rev=49&view=rev
Author: bschuller
Date: 2010-09-15 19:22:18 +0000 (Wed, 15 Sep 2010)
Log Message:
-----------
simpler way to implement a receive buffer
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Added Paths:
-----------
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/test/java/udt/util/
udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-09-13 18:55:07 UTC (rev 48)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -34,12 +34,10 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import udt.util.SequenceNumber;
-import udt.util.UDTStatistics;
+import udt.util.ReceiveBuffer;
/**
* The UDTInputStream receives data blocks from the {@link UDTSocket}
@@ -53,16 +51,8 @@
//the socket owning this inputstream
private final UDTSocket socket;
- //inbound application data, in-order, and ready for reading
- //by the application
- private final PriorityBlockingQueue<AppData>appData;
+ private final ReceiveBuffer receiveBuffer;
- private final UDTStatistics statistics;
-
- //the highest sequence number read by the application, initialised
- //to the initial sequence number minus one
- private volatile long highestSequenceNumber=0;
-
//set to 'false' by the receiver when it gets a shutdown signal from the peer
//see the noMoreData() method
private final AtomicBoolean expectMoreData=new AtomicBoolean(true);
@@ -74,28 +64,15 @@
/**
* create a new {@link UDTInputStream} connected to the given socket
* @param socket - the {@link UDTSocket}
- * @param statistics - the {@link UDTStatistics}
* @throws IOException
*/
- public UDTInputStream(UDTSocket socket, UDTStatistics statistics)throws IOException{
+ public UDTInputStream(UDTSocket socket)throws IOException{
this.socket=socket;
- this.statistics=statistics;
- int capacity=socket!=null? 4*socket.getSession().getFlowWindowSize() : 64 ;
- appData=new PriorityBlockingQueue<AppData>(capacity);
- if(socket!=null){
- highestSequenceNumber=SequenceNumber.decrement(socket.getSession().getInitialSequenceNumber());
- }
+ int capacity=socket!=null? 2 * socket.getSession().getFlowWindowSize() : 128 ;
+ long initialSequenceNum=socket!=null?socket.getSession().getInitialSequenceNumber():1;
+ receiveBuffer=new ReceiveBuffer(capacity,initialSequenceNum);
}
- /**
- * create a new {@link UDTInputStream} connected to the given socket
- * @param socket - the {@link UDTSocket}
- * @throws IOException
- */
- public UDTInputStream(UDTSocket socket)throws IOException{
- this(socket, socket.getSession().getStatistics());
- }
-
private final byte[]single=new byte[1];
@Override
@@ -143,7 +120,7 @@
if(read>0)return read;
if(closed)return -1;
- if(expectMoreData.get() || !appData.isEmpty())return 0;
+ if(expectMoreData.get() || !receiveBuffer.isEmpty())return 0;
//no more data
return -1;
@@ -168,38 +145,19 @@
while(true){
try{
if(block){
- currentChunk=appData.poll(1, TimeUnit.MILLISECONDS);
+ currentChunk=receiveBuffer.poll(1, TimeUnit.MILLISECONDS);
while (!closed && currentChunk==null){
- currentChunk=appData.poll(1000, TimeUnit.MILLISECONDS);
+ currentChunk=receiveBuffer.poll(1000, TimeUnit.MILLISECONDS);
}
}
- else currentChunk=appData.poll(10, TimeUnit.MILLISECONDS);
+ else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS);
}catch(InterruptedException ie){
IOException ex=new IOException();
ex.initCause(ie);
throw ex;
}
- if(currentChunk!=null){
- //check if the data is in-order
- long cmp=SequenceNumber.compare(currentChunk.sequenceNumber,highestSequenceNumber+1);
- if(cmp==0){
- highestSequenceNumber=currentChunk.sequenceNumber;
- return;
- }
- else if(cmp<0){
- //duplicate, drop it
- currentChunk=null;
- statistics.incNumberOfDuplicateDataPackets();
- }
- else{
- //out of order data, put back into queue and exit
- appData.offer(currentChunk);
- currentChunk=null;
- return;
- }
- }
- else return;
+ return;
}
}
@@ -209,8 +167,7 @@
*
*/
protected boolean haveNewData(long sequenceNumber,byte[]data)throws IOException{
- if(SequenceNumber.compare(sequenceNumber,highestSequenceNumber)<=0)return true;
- return appData.offer(new AppData(sequenceNumber,data));
+ return receiveBuffer.offer(new AppData(sequenceNumber,data));
}
@Override
@@ -232,6 +189,10 @@
this.blocking=block;
}
+ public int getReceiveBufferSize(){
+ return receiveBuffer.getSize();
+ }
+
/**
* notify the input stream that there is no more data
* @throws IOException
@@ -247,7 +208,7 @@
public static class AppData implements Comparable<AppData>{
final long sequenceNumber;
final byte[] data;
- AppData(long sequenceNumber, byte[]data){
+ public AppData(long sequenceNumber, byte[]data){
this.sequenceNumber=sequenceNumber;
this.data=data;
}
@@ -260,6 +221,10 @@
return sequenceNumber+"["+data.length+"]";
}
+ public long getSequenceNumber(){
+ return sequenceNumber;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-13 18:55:07 UTC (rev 48)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -388,12 +388,17 @@
//check whether to drop this packet
// n++;
- //if(dropRate>0 && n % dropRate == 0){
+// //if(dropRate>0 && n % dropRate == 0){
// if(n==666){
// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
// return;
// }
-
+//
+ boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
+ if(!OK){
+ //need to drop packet...
+ return;
+ }
long currentDataPacketArrivalTime = Util.getCurrentTime();
@@ -412,7 +417,6 @@
//store current time
lastDataPacketArrivalTime=currentDataPacketArrivalTime;
- session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
//(6).number of detected lossed packet
/*(6.a).if the number of the current data packet is greater than LSRN+1,
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-09-13 18:55:07 UTC (rev 48)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -74,7 +74,7 @@
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=8192;//4*128;
+ protected int flowWindowSize=1024;
/**
* remote UDT entity (address and socket ID)
Added: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java (rev 0)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -0,0 +1,149 @@
+package udt.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import udt.UDTInputStream.AppData;
+
+/**
+ *
+ * The receive buffer stores data chunks to be read by the application
+ *
+ * @author schuller
+ */
+public class ReceiveBuffer {
+
+ private final AppData[]buffer;
+
+ //the head of the buffer: contains the next chunk to be read by the application,
+ //i.e. the one with the lowest sequence number
+ private volatile int readPosition=0;
+
+ //the lowest sequence number stored in this buffer
+ private final long initialSequenceNumber;
+
+ //the highest sequence number already read by the application
+ private long highestReadSequenceNumber;
+
+ //number of chunks
+ private final AtomicInteger numValidChunks=new AtomicInteger(0);
+
+ //lock and condition for poll() with timeout
+ private final Condition notEmpty;
+ private final ReentrantLock lock;
+
+ //the size of the buffer
+ private final int size;
+
+ public ReceiveBuffer(int size, long initialSequenceNumber){
+ this.size=size;
+ this.buffer=new AppData[size];
+ this.initialSequenceNumber=initialSequenceNumber;
+ lock=new ReentrantLock(false);
+ notEmpty=lock.newCondition();
+ highestReadSequenceNumber=SequenceNumber.decrement(initialSequenceNumber);
+ System.out.println("SIZE: "+size);
+ }
+
+ public boolean offer(AppData data){
+ if(numValidChunks.get()==size) {
+ return false;
+ }
+ lock.lock();
+ try{
+ long seq=data.getSequenceNumber();
+ //if already have this chunk, discard it
+ if(SequenceNumber.compare(seq, initialSequenceNumber)<0)return true;
+ //else compute insert position
+ int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq);
+ int insert=offset% size;
+ buffer[insert]=data;
+ numValidChunks.incrementAndGet();
+ return true;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+ /**
+ * return a data chunk, guaranteed to be in-order, waiting up to the
+ * specified wait time if necessary for a chunk to become available.
+ *
+ * @param timeout how long to wait before giving up, in units of
+ * <tt>unit</tt>
+ * @param unit a <tt>TimeUnit</tt> determining how to interpret the
+ * <tt>timeout</tt> parameter
+ * @return data chunk, or <tt>null</tt> if the
+ * specified waiting time elapses before an element is available
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public AppData poll(int timeout, TimeUnit units)throws InterruptedException{
+ lock.lockInterruptibly();
+ long nanos = units.toNanos(timeout);
+
+ try {
+ for (;;) {
+ if (numValidChunks.get() != 0) {
+ return poll();
+ }
+ if (nanos <= 0)
+ return null;
+ try {
+ nanos = notEmpty.awaitNanos(nanos);
+ } catch (InterruptedException ie) {
+ notEmpty.signal(); // propagate to non-interrupted thread
+ throw ie;
+ }
+
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ /**
+ * return a data chunk, guaranteed to be in-order.
+ */
+ public AppData poll(){
+ if(numValidChunks.get()==0){
+ return null;
+ }
+ AppData r=buffer[readPosition];
+ if(r!=null){
+ long thisSeq=r.getSequenceNumber();
+ if(1==SequenceNumber.seqOffset(highestReadSequenceNumber,thisSeq)){
+ increment();
+ highestReadSequenceNumber=thisSeq;
+ }
+ else return null;
+ }
+ // else{
+ // System.out.println("empty HEAD at pos="+readPosition);
+ // try{
+ // Thread.sleep(1000);
+ // Thread.yield();
+ // }catch(InterruptedException e){};
+ // }
+
+ return r;
+ }
+
+ public int getSize(){
+ return size;
+ }
+
+ void increment(){
+ buffer[readPosition]=null;
+ readPosition++;
+ if(readPosition==size)readPosition=0;
+ numValidChunks.decrementAndGet();
+ }
+
+ public boolean isEmpty(){
+ return numValidChunks.get()==0;
+ }
+
+}
Property changes on: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-09-13 18:55:07 UTC (rev 48)
+++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -1,15 +1,17 @@
package udt;
import java.security.MessageDigest;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
-import udt.util.UDTStatistics;
import udt.util.Util;
public class TestUDTInputStream extends UDTTestBase{
public void test1()throws Exception{
- UDTStatistics stat=new UDTStatistics("test");
- UDTInputStream is=new UDTInputStream(null, stat);
+ UDTInputStream is=new UDTInputStream(null);
byte[] data1="this is ".getBytes();
byte[] data2="a test".getBytes();
byte[] data3=" string".getBytes();
@@ -24,8 +26,7 @@
}
public void test2()throws Exception{
- UDTStatistics stat=new UDTStatistics("test");
- UDTInputStream is=new UDTInputStream(null, stat);
+ UDTInputStream is=new UDTInputStream(null);
byte[] data1=getRandomData(65537);
byte[] data2=getRandomData(1234);
byte[] data3=getRandomData(3*1024*1024);
@@ -40,8 +41,7 @@
}
public void testInOrder()throws Exception{
- UDTStatistics stat=new UDTStatistics("test");
- UDTInputStream is=new UDTInputStream(null, stat);
+ UDTInputStream is=new UDTInputStream(null);
is.setBlocking(false);
byte[]data=getRandomData(10*1024*1024);
@@ -58,8 +58,7 @@
}
public void testRandomOrder()throws Exception{
- UDTStatistics stat=new UDTStatistics("test");
- UDTInputStream is=new UDTInputStream(null, stat);
+ UDTInputStream is=new UDTInputStream(null);
is.setBlocking(false);
byte[]data=getRandomData(100*1024);
@@ -76,6 +75,50 @@
assertEquals(digest,readMD5);
}
+
+
+ public void testLargeDataSetTwoThreads()throws Exception{
+ final UDTInputStream is=new UDTInputStream(null);
+ is.setBlocking(false);
+ int n=100;
+ assertTrue("ERROR IN UNIT TEST : too many packets!",n<=is.getReceiveBufferSize());
+ final byte[]data=getRandomData(n*1024);
+ final byte[][]blocks=makeChunks(n,data);
+ String digest=computeMD5(blocks);
+
+ Runnable write=new Runnable(){
+ public void run(){
+ try{
+ for(int i=0;i<blocks.length;i++){
+ while(!is.haveNewData(i+1, blocks[i])){
+ Thread.yield();
+ Thread.sleep(100);
+ }
+ }
+ is.noMoreData();
+ }catch(Exception e){
+ e.printStackTrace();
+ fail();
+ }
+ }
+ };
+
+ Callable<String> reader=new Callable<String>(){
+ public String call() throws Exception {
+ String md5=readAll(is,1024*999);
+ return md5;
+ }
+ };
+
+ ScheduledExecutorService es=Executors.newScheduledThreadPool(2);
+ es.execute(write);
+ Future<String> result=es.submit(reader);
+ String readMD5=result.get();
+
+ assertEquals(digest,readMD5);
+ es.shutdownNow();
+ }
+
//read and discard data from the given input stream
//returns the md5 digest of the data
protected String readAll(UDTInputStream is, int bufsize,boolean sendNoMoreData)throws Exception{
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-13 18:55:07 UTC (rev 48)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=50;
+ int num_packets=500;
//how large is a single packet
int size=1*1024*1024;
Added: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java (rev 0)
+++ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49)
@@ -0,0 +1,142 @@
+package udt.util;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+import udt.UDTInputStream.AppData;
+
+public class TestReceiveBuffer extends TestCase{
+
+ public void testInOrder(){
+ ReceiveBuffer b=new ReceiveBuffer(16,1);
+ byte[]test1="test1".getBytes();
+ byte[]test2="test2".getBytes();
+ byte[]test3="test3".getBytes();
+
+ b.offer(new AppData(1l,test1));
+ b.offer(new AppData(2l,test2));
+ b.offer(new AppData(3l,test3));
+
+ AppData a=b.poll();
+ assertEquals(1l,a.getSequenceNumber());
+
+ a=b.poll();
+ assertEquals(2l,a.getSequenceNumber());
+
+ a=b.poll();
+ assertEquals(3l,a.getSequenceNumber());
+
+ assertNull(b.poll());
+ }
+
+ public void testOutOfOrder(){
+ ReceiveBuffer b=new ReceiveBuffer(16,1);
+ byte[]test1="test1".getBytes();
+ byte[]test2="test2".getBytes();
+ byte[]test3="test3".getBytes();
+
+ b.offer(new AppData(3l,test3));
+ b.offer(new AppData(2l,test2));
+ b.offer(new AppData(1l,test1));
+
+ AppData a=b.poll();
+ assertEquals(1l,a.getSequenceNumber());
+
+ a=b.poll();
+ assertEquals(2l,a.getSequenceNumber());
+
+ a=b.poll();
+ assertEquals(3l,a.getSequenceNumber());
+
+ assertNull(b.poll());
+ }
+
+ public void testInterleaved(){
+ ReceiveBuffer b=new ReceiveBuffer(16,1);
+ byte[]test1="test1".getBytes();
+ byte[]test2="test2".getBytes();
+ byte[]test3="test3".getBytes();
+
+ b.offer(new AppData(3l,test3));
+
+ b.offer(new AppData(1l,test1));
+
+ AppData a=b.poll();
+ assertEquals(1l,a.getSequenceNumber());
+
+ assertNull(b.poll());
+
+ b.offer(new AppData(2l,test2));
+
+ a=b.poll();
+ assertEquals(2l,a.getSequenceNumber());
+
+ a=b.poll();
+ assertEquals(3l,a.getSequenceNumber());
+ }
+
+ public void testOverflow(){
+ ReceiveBuffer b=new ReceiveBuffer(4,1);
+
+ for(int i=0; i<3; i++){
+ b.offer(new AppData(i+1,"test".getBytes()));
+ }
+ for(int i=0; i<3; i++){
+ assertEquals(i+1, b.poll().getSequenceNumber());
+ }
+
+ for(int i=0; i<3; i++){
+ b.offer(new AppData(i+4,"test".getBytes()));
+ }
+ for(int i=0; i<3; i++){
+ assertEquals(i+4, b.poll().getSequenceNumber());
+ }
+ }
+
+
+ public void testTimedPoll()throws Exception{
+ final ReceiveBuffer b=new ReceiveBuffer(4,1);
+
+ Runnable write=new Runnable(){
+
+ public void run(){
+ try{
+ for(int i=0; i<5; i++){
+ Thread.sleep(500);
+ b.offer(new AppData(i+1,"test".getBytes()));
+ }
+ }catch(Exception e){
+ e.printStackTrace();
+ fail();
+ }
+ }
+ };
+
+ Callable<String> reader=new Callable<String>(){
+ public String call() throws Exception {
+ for(int i=0; i<5; i++){
+ AppData r=null;
+ do{
+ try{
+ r=b.poll(200, TimeUnit.MILLISECONDS);
+ }catch(InterruptedException ie){
+ ie.printStackTrace();
+ }
+ }while(r==null);
+ }
+ return "OK.";
+ }
+ };
+
+ ScheduledExecutorService es=Executors.newScheduledThreadPool(2);
+ es.execute(write);
+ Future<String>res=es.submit(reader);
+ res.get();
+ es.shutdownNow();
+ }
+
+}
Property changes on: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|