[Udt-java-commits] SF.net SVN: udt-java:[49] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-09-15 19:22:25
|
Revision: 49 http://udt-java.svn.sourceforge.net/udt-java/?rev=49&view=rev Author: bschuller Date: 2010-09-15 19:22:18 +0000 (Wed, 15 Sep 2010) Log Message: ----------- simpler way to implement a receive buffer Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/test/java/udt/util/ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-09-13 18:55:07 UTC (rev 48) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-09-15 19:22:18 UTC (rev 49) @@ -34,12 +34,10 @@ import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import udt.util.SequenceNumber; -import udt.util.UDTStatistics; +import udt.util.ReceiveBuffer; /** * The UDTInputStream receives data blocks from the {@link UDTSocket} @@ -53,16 +51,8 @@ //the socket owning this inputstream private final UDTSocket socket; - //inbound application data, in-order, and ready for reading - //by the application - private final PriorityBlockingQueue<AppData>appData; + private final ReceiveBuffer receiveBuffer; - private final UDTStatistics statistics; - - //the highest sequence number read by the application, initialised - //to the initial sequence number minus one - private volatile long highestSequenceNumber=0; - //set to 'false' by the receiver when it gets a shutdown signal from the peer //see the noMoreData() method private final AtomicBoolean expectMoreData=new AtomicBoolean(true); @@ -74,28 +64,15 @@ /** * create a new {@link UDTInputStream} connected to the given socket * @param socket - the {@link UDTSocket} - * @param statistics - the {@link UDTStatistics} * @throws IOException */ - public UDTInputStream(UDTSocket socket, UDTStatistics statistics)throws IOException{ + public UDTInputStream(UDTSocket socket)throws IOException{ this.socket=socket; - this.statistics=statistics; - int capacity=socket!=null? 4*socket.getSession().getFlowWindowSize() : 64 ; - appData=new PriorityBlockingQueue<AppData>(capacity); - if(socket!=null){ - highestSequenceNumber=SequenceNumber.decrement(socket.getSession().getInitialSequenceNumber()); - } + int capacity=socket!=null? 2 * socket.getSession().getFlowWindowSize() : 128 ; + long initialSequenceNum=socket!=null?socket.getSession().getInitialSequenceNumber():1; + receiveBuffer=new ReceiveBuffer(capacity,initialSequenceNum); } - /** - * create a new {@link UDTInputStream} connected to the given socket - * @param socket - the {@link UDTSocket} - * @throws IOException - */ - public UDTInputStream(UDTSocket socket)throws IOException{ - this(socket, socket.getSession().getStatistics()); - } - private final byte[]single=new byte[1]; @Override @@ -143,7 +120,7 @@ if(read>0)return read; if(closed)return -1; - if(expectMoreData.get() || !appData.isEmpty())return 0; + if(expectMoreData.get() || !receiveBuffer.isEmpty())return 0; //no more data return -1; @@ -168,38 +145,19 @@ while(true){ try{ if(block){ - currentChunk=appData.poll(1, TimeUnit.MILLISECONDS); + currentChunk=receiveBuffer.poll(1, TimeUnit.MILLISECONDS); while (!closed && currentChunk==null){ - currentChunk=appData.poll(1000, TimeUnit.MILLISECONDS); + currentChunk=receiveBuffer.poll(1000, TimeUnit.MILLISECONDS); } } - else currentChunk=appData.poll(10, TimeUnit.MILLISECONDS); + else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ IOException ex=new IOException(); ex.initCause(ie); throw ex; } - if(currentChunk!=null){ - //check if the data is in-order - long cmp=SequenceNumber.compare(currentChunk.sequenceNumber,highestSequenceNumber+1); - if(cmp==0){ - highestSequenceNumber=currentChunk.sequenceNumber; - return; - } - else if(cmp<0){ - //duplicate, drop it - currentChunk=null; - statistics.incNumberOfDuplicateDataPackets(); - } - else{ - //out of order data, put back into queue and exit - appData.offer(currentChunk); - currentChunk=null; - return; - } - } - else return; + return; } } @@ -209,8 +167,7 @@ * */ protected boolean haveNewData(long sequenceNumber,byte[]data)throws IOException{ - if(SequenceNumber.compare(sequenceNumber,highestSequenceNumber)<=0)return true; - return appData.offer(new AppData(sequenceNumber,data)); + return receiveBuffer.offer(new AppData(sequenceNumber,data)); } @Override @@ -232,6 +189,10 @@ this.blocking=block; } + public int getReceiveBufferSize(){ + return receiveBuffer.getSize(); + } + /** * notify the input stream that there is no more data * @throws IOException @@ -247,7 +208,7 @@ public static class AppData implements Comparable<AppData>{ final long sequenceNumber; final byte[] data; - AppData(long sequenceNumber, byte[]data){ + public AppData(long sequenceNumber, byte[]data){ this.sequenceNumber=sequenceNumber; this.data=data; } @@ -260,6 +221,10 @@ return sequenceNumber+"["+data.length+"]"; } + public long getSequenceNumber(){ + return sequenceNumber; + } + @Override public int hashCode() { final int prime = 31; Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-13 18:55:07 UTC (rev 48) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-15 19:22:18 UTC (rev 49) @@ -388,12 +388,17 @@ //check whether to drop this packet // n++; - //if(dropRate>0 && n % dropRate == 0){ +// //if(dropRate>0 && n % dropRate == 0){ // if(n==666){ // logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); // return; // } - +// + boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); + if(!OK){ + //need to drop packet... + return; + } long currentDataPacketArrivalTime = Util.getCurrentTime(); @@ -412,7 +417,6 @@ //store current time lastDataPacketArrivalTime=currentDataPacketArrivalTime; - session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); //(6).number of detected lossed packet /*(6.a).if the number of the current data packet is greater than LSRN+1, Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-09-13 18:55:07 UTC (rev 48) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-09-15 19:22:18 UTC (rev 49) @@ -74,7 +74,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=8192;//4*128; + protected int flowWindowSize=1024; /** * remote UDT entity (address and socket ID) Added: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java (rev 0) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49) @@ -0,0 +1,149 @@ +package udt.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import udt.UDTInputStream.AppData; + +/** + * + * The receive buffer stores data chunks to be read by the application + * + * @author schuller + */ +public class ReceiveBuffer { + + private final AppData[]buffer; + + //the head of the buffer: contains the next chunk to be read by the application, + //i.e. the one with the lowest sequence number + private volatile int readPosition=0; + + //the lowest sequence number stored in this buffer + private final long initialSequenceNumber; + + //the highest sequence number already read by the application + private long highestReadSequenceNumber; + + //number of chunks + private final AtomicInteger numValidChunks=new AtomicInteger(0); + + //lock and condition for poll() with timeout + private final Condition notEmpty; + private final ReentrantLock lock; + + //the size of the buffer + private final int size; + + public ReceiveBuffer(int size, long initialSequenceNumber){ + this.size=size; + this.buffer=new AppData[size]; + this.initialSequenceNumber=initialSequenceNumber; + lock=new ReentrantLock(false); + notEmpty=lock.newCondition(); + highestReadSequenceNumber=SequenceNumber.decrement(initialSequenceNumber); + System.out.println("SIZE: "+size); + } + + public boolean offer(AppData data){ + if(numValidChunks.get()==size) { + return false; + } + lock.lock(); + try{ + long seq=data.getSequenceNumber(); + //if already have this chunk, discard it + if(SequenceNumber.compare(seq, initialSequenceNumber)<0)return true; + //else compute insert position + int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq); + int insert=offset% size; + buffer[insert]=data; + numValidChunks.incrementAndGet(); + return true; + }finally{ + lock.unlock(); + } + } + + /** + * return a data chunk, guaranteed to be in-order, waiting up to the + * specified wait time if necessary for a chunk to become available. + * + * @param timeout how long to wait before giving up, in units of + * <tt>unit</tt> + * @param unit a <tt>TimeUnit</tt> determining how to interpret the + * <tt>timeout</tt> parameter + * @return data chunk, or <tt>null</tt> if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + public AppData poll(int timeout, TimeUnit units)throws InterruptedException{ + lock.lockInterruptibly(); + long nanos = units.toNanos(timeout); + + try { + for (;;) { + if (numValidChunks.get() != 0) { + return poll(); + } + if (nanos <= 0) + return null; + try { + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + + } + } finally { + lock.unlock(); + } + } + + + /** + * return a data chunk, guaranteed to be in-order. + */ + public AppData poll(){ + if(numValidChunks.get()==0){ + return null; + } + AppData r=buffer[readPosition]; + if(r!=null){ + long thisSeq=r.getSequenceNumber(); + if(1==SequenceNumber.seqOffset(highestReadSequenceNumber,thisSeq)){ + increment(); + highestReadSequenceNumber=thisSeq; + } + else return null; + } + // else{ + // System.out.println("empty HEAD at pos="+readPosition); + // try{ + // Thread.sleep(1000); + // Thread.yield(); + // }catch(InterruptedException e){}; + // } + + return r; + } + + public int getSize(){ + return size; + } + + void increment(){ + buffer[readPosition]=null; + readPosition++; + if(readPosition==size)readPosition=0; + numValidChunks.decrementAndGet(); + } + + public boolean isEmpty(){ + return numValidChunks.get()==0; + } + +} Property changes on: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-09-13 18:55:07 UTC (rev 48) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-09-15 19:22:18 UTC (rev 49) @@ -1,15 +1,17 @@ package udt; import java.security.MessageDigest; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; -import udt.util.UDTStatistics; import udt.util.Util; public class TestUDTInputStream extends UDTTestBase{ public void test1()throws Exception{ - UDTStatistics stat=new UDTStatistics("test"); - UDTInputStream is=new UDTInputStream(null, stat); + UDTInputStream is=new UDTInputStream(null); byte[] data1="this is ".getBytes(); byte[] data2="a test".getBytes(); byte[] data3=" string".getBytes(); @@ -24,8 +26,7 @@ } public void test2()throws Exception{ - UDTStatistics stat=new UDTStatistics("test"); - UDTInputStream is=new UDTInputStream(null, stat); + UDTInputStream is=new UDTInputStream(null); byte[] data1=getRandomData(65537); byte[] data2=getRandomData(1234); byte[] data3=getRandomData(3*1024*1024); @@ -40,8 +41,7 @@ } public void testInOrder()throws Exception{ - UDTStatistics stat=new UDTStatistics("test"); - UDTInputStream is=new UDTInputStream(null, stat); + UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); byte[]data=getRandomData(10*1024*1024); @@ -58,8 +58,7 @@ } public void testRandomOrder()throws Exception{ - UDTStatistics stat=new UDTStatistics("test"); - UDTInputStream is=new UDTInputStream(null, stat); + UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); byte[]data=getRandomData(100*1024); @@ -76,6 +75,50 @@ assertEquals(digest,readMD5); } + + + public void testLargeDataSetTwoThreads()throws Exception{ + final UDTInputStream is=new UDTInputStream(null); + is.setBlocking(false); + int n=100; + assertTrue("ERROR IN UNIT TEST : too many packets!",n<=is.getReceiveBufferSize()); + final byte[]data=getRandomData(n*1024); + final byte[][]blocks=makeChunks(n,data); + String digest=computeMD5(blocks); + + Runnable write=new Runnable(){ + public void run(){ + try{ + for(int i=0;i<blocks.length;i++){ + while(!is.haveNewData(i+1, blocks[i])){ + Thread.yield(); + Thread.sleep(100); + } + } + is.noMoreData(); + }catch(Exception e){ + e.printStackTrace(); + fail(); + } + } + }; + + Callable<String> reader=new Callable<String>(){ + public String call() throws Exception { + String md5=readAll(is,1024*999); + return md5; + } + }; + + ScheduledExecutorService es=Executors.newScheduledThreadPool(2); + es.execute(write); + Future<String> result=es.submit(reader); + String readMD5=result.get(); + + assertEquals(digest,readMD5); + es.shutdownNow(); + } + //read and discard data from the given input stream //returns the md5 digest of the data protected String readAll(UDTInputStream is, int bufsize,boolean sendNoMoreData)throws Exception{ Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-13 18:55:07 UTC (rev 48) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-15 19:22:18 UTC (rev 49) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=50; + int num_packets=500; //how large is a single packet int size=1*1024*1024; Added: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java =================================================================== --- udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java (rev 0) +++ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49) @@ -0,0 +1,142 @@ +package udt.util; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; +import udt.UDTInputStream.AppData; + +public class TestReceiveBuffer extends TestCase{ + + public void testInOrder(){ + ReceiveBuffer b=new ReceiveBuffer(16,1); + byte[]test1="test1".getBytes(); + byte[]test2="test2".getBytes(); + byte[]test3="test3".getBytes(); + + b.offer(new AppData(1l,test1)); + b.offer(new AppData(2l,test2)); + b.offer(new AppData(3l,test3)); + + AppData a=b.poll(); + assertEquals(1l,a.getSequenceNumber()); + + a=b.poll(); + assertEquals(2l,a.getSequenceNumber()); + + a=b.poll(); + assertEquals(3l,a.getSequenceNumber()); + + assertNull(b.poll()); + } + + public void testOutOfOrder(){ + ReceiveBuffer b=new ReceiveBuffer(16,1); + byte[]test1="test1".getBytes(); + byte[]test2="test2".getBytes(); + byte[]test3="test3".getBytes(); + + b.offer(new AppData(3l,test3)); + b.offer(new AppData(2l,test2)); + b.offer(new AppData(1l,test1)); + + AppData a=b.poll(); + assertEquals(1l,a.getSequenceNumber()); + + a=b.poll(); + assertEquals(2l,a.getSequenceNumber()); + + a=b.poll(); + assertEquals(3l,a.getSequenceNumber()); + + assertNull(b.poll()); + } + + public void testInterleaved(){ + ReceiveBuffer b=new ReceiveBuffer(16,1); + byte[]test1="test1".getBytes(); + byte[]test2="test2".getBytes(); + byte[]test3="test3".getBytes(); + + b.offer(new AppData(3l,test3)); + + b.offer(new AppData(1l,test1)); + + AppData a=b.poll(); + assertEquals(1l,a.getSequenceNumber()); + + assertNull(b.poll()); + + b.offer(new AppData(2l,test2)); + + a=b.poll(); + assertEquals(2l,a.getSequenceNumber()); + + a=b.poll(); + assertEquals(3l,a.getSequenceNumber()); + } + + public void testOverflow(){ + ReceiveBuffer b=new ReceiveBuffer(4,1); + + for(int i=0; i<3; i++){ + b.offer(new AppData(i+1,"test".getBytes())); + } + for(int i=0; i<3; i++){ + assertEquals(i+1, b.poll().getSequenceNumber()); + } + + for(int i=0; i<3; i++){ + b.offer(new AppData(i+4,"test".getBytes())); + } + for(int i=0; i<3; i++){ + assertEquals(i+4, b.poll().getSequenceNumber()); + } + } + + + public void testTimedPoll()throws Exception{ + final ReceiveBuffer b=new ReceiveBuffer(4,1); + + Runnable write=new Runnable(){ + + public void run(){ + try{ + for(int i=0; i<5; i++){ + Thread.sleep(500); + b.offer(new AppData(i+1,"test".getBytes())); + } + }catch(Exception e){ + e.printStackTrace(); + fail(); + } + } + }; + + Callable<String> reader=new Callable<String>(){ + public String call() throws Exception { + for(int i=0; i<5; i++){ + AppData r=null; + do{ + try{ + r=b.poll(200, TimeUnit.MILLISECONDS); + }catch(InterruptedException ie){ + ie.printStackTrace(); + } + }while(r==null); + } + return "OK."; + } + }; + + ScheduledExecutorService es=Executors.newScheduledThreadPool(2); + es.execute(write); + Future<String>res=es.submit(reader); + res.get(); + es.shutdownNow(); + } + +} Property changes on: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |