[Udt-java-commits] SF.net SVN: udt-java:[70] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2012-02-03 09:09:43
|
Revision: 70 http://udt-java.svn.sourceforge.net/udt-java/?rev=70&view=rev Author: bschuller Date: 2012-02-03 09:09:37 +0000 (Fri, 03 Feb 2012) Log Message: ----------- roll back change to FlowWindow; add some comments to make it easier to understand Modified Paths: -------------- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 09:09:37 UTC (rev 70) @@ -6,10 +6,12 @@ /** * - * holds a fixed number of {@link DataPacket} instances which are sent out. + * Holds a fixed number of {@link DataPacket} instances which are sent out.<br/> * - * it is assumed that a single thread stores new data, and another single thread - * reads/removes data + * it is assumed that a single thread (the producer) stores new data, + * and another single thread (the consumer) reads/removes data.<br/> + * + * * * @author schuller */ @@ -23,12 +25,15 @@ private volatile boolean isFull=false; + //valid entries that can be read private volatile int validEntries=0; private volatile boolean isCheckout=false; + //index where the next data packet will be written to private volatile int writePos=0; + //one before the index where the next data packet will be read from private volatile int readPos=-1; private volatile int consumed=0; @@ -42,7 +47,7 @@ * @param chunksize - data chunk size */ public FlowWindow(int size, int chunksize){ - this.length=size; + this.length=size+1; packets=new DataPacket[length]; for(int i=0;i<packets.length;i++){ packets[i]=new DataPacket(); @@ -64,21 +69,25 @@ } if(isCheckout)throw new IllegalStateException(); isCheckout=true; - DataPacket p=packets[writePos]; - return p; + return packets[writePos]; }finally{ lock.unlock(); } } + /** + * notify the flow window that the data packet obtained by {@link #getForProducer()} + * has been filled with data and is ready for sending out + */ public void produce(){ lock.lock(); try{ + if(!isCheckout)throw new IllegalStateException(); isCheckout=false; writePos++; if(writePos==length)writePos=0; validEntries++; - isFull=validEntries==length; + isFull=validEntries==length-1; isEmpty=false; produced++; }finally{ @@ -88,11 +97,11 @@ public DataPacket consumeData(){ - if(isEmpty){ - return null; - } lock.lock(); try{ + if(isEmpty){ + return null; + } readPos++; DataPacket p=packets[readPos]; if(readPos==length-1)readPos=-1; @@ -133,6 +142,7 @@ StringBuilder sb=new StringBuilder(); sb.append("FlowWindow size=").append(length); sb.append(" full=").append(isFull).append(" empty=").append(isEmpty); + sb.append(" readPos=").append(readPos).append(" writePos=").append(writePos); sb.append(" consumed=").append(consumed).append(" produced=").append(produced); return sb.toString(); } Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 09:09:37 UTC (rev 70) @@ -25,9 +25,9 @@ @Test public void testWithoutLoss()throws Exception{ - Logger.getLogger("udt").setLevel(Level.WARNING); + Logger.getLogger("udt").setLevel(Level.INFO); UDTReceiver.dropRate=0; - num_packets=640; + num_packets=1000; TIMEOUT=Integer.MAX_VALUE; doTest(); } @@ -37,9 +37,9 @@ public void testWithLoss()throws Exception{ UDTReceiver.dropRate=3; TIMEOUT=Integer.MAX_VALUE; - num_packets=512; + num_packets=100; //set log level - Logger.getLogger("udt").setLevel(Level.WARNING); + Logger.getLogger("udt").setLevel(Level.INFO); doTest(); } @@ -48,9 +48,9 @@ public void testLargeDataSet()throws Exception{ UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - num_packets=3*1024; + num_packets=100; //set log level - Logger.getLogger("udt").setLevel(Level.WARNING); + Logger.getLogger("udt").setLevel(Level.INFO); doTest(); } Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 09:09:37 UTC (rev 70) @@ -6,6 +6,7 @@ import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -18,7 +19,7 @@ */ public class UDPTest { - final int num_packets=10*10*1000; + final int num_packets=1000; final int packetSize=UDPEndPoint.DATAGRAM_SIZE; @Test @@ -48,6 +49,7 @@ dgSendInterval.end(); dgSendTime.begin(); s.send(dp); + Thread.sleep(5); dgSendTime.end(); dgSendInterval.begin(); } @@ -76,11 +78,10 @@ public void run(){ try{ byte[]buf=new byte[packetSize]; - DatagramPacket dp=new DatagramPacket(buf,buf.length); while(true){ + DatagramPacket dp=new DatagramPacket(buf,buf.length); serverSocket.receive(dp); handoff.offer(dp); - total+=dp.getLength(); } } catch(Exception e){ @@ -91,6 +92,7 @@ }; Thread t=new Thread(serverProcess); t.start(); + System.out.println("Server started."); } private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>(); @@ -99,11 +101,15 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ + int counter=0; long start=System.currentTimeMillis(); - while(true){ - DatagramPacket dp=handoff.poll(); - if(dp!=null)total+=dp.getLength(); - if(total==N)break; + while(counter<num_packets){ + DatagramPacket dp=handoff.poll(10, TimeUnit.MILLISECONDS); + if(dp!=null){ + total+=dp.getLength(); + counter++; + System.out.println("Count: "+counter); + } } long end=System.currentTimeMillis(); System.out.println("Server time: "+(end-start)+" ms."); @@ -117,6 +123,7 @@ }; Thread t=new Thread(serverProcess); t.start(); + System.out.println("Hand-off thread started."); } Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 09:09:37 UTC (rev 70) @@ -33,6 +33,7 @@ DataPacket no=fw.getForProducer(); assertNull("Window should be full",no); + assertTrue(fw.isFull()); DataPacket c1=fw.consumeData(); //must be p1 @@ -73,14 +74,21 @@ DataPacket p4=fw.getForProducer(); assertNotNull(p4); fw.produce(); + fw.consumeData(); + + DataPacket p5=fw.getForProducer(); + assertNotNull(p5); + fw.produce(); + //which is again p1 - assertTrue(p4==p1); + assertTrue(p5==p1); } private volatile boolean fail=false; - public void testConcurrentReadWrite()throws InterruptedException{ + @Test + public void testConcurrentReadWrite_20()throws InterruptedException{ final FlowWindow fw=new FlowWindow(20, 64); Thread reader=new Thread(new Runnable(){ public void run(){ @@ -107,6 +115,34 @@ } + @Test + public void testConcurrentReadWrite_2()throws InterruptedException{ + final FlowWindow fw=new FlowWindow(2, 64); + Thread reader=new Thread(new Runnable(){ + public void run(){ + doRead(fw); + } + }); + reader.setName("reader"); + Thread writer=new Thread(new Runnable(){ + public void run(){ + doWrite(fw); + } + }); + writer.setName("writer"); + + writer.start(); + reader.start(); + + int c=0; + while(read && write && c<10){ + Thread.sleep(1000); + c++; + } + assertFalse("An error occured in reader or writer",fail); + + } + volatile boolean read=true; volatile boolean write=true; int N=100000; @@ -123,6 +159,7 @@ } }catch(Throwable ex){ ex.printStackTrace(); + System.out.println(fw); fail=true; } System.out.println("Exiting reader..."); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |