[Udt-java-commits] SF.net SVN: udt-java:[70] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2012-02-03 09:09:43
|
Revision: 70
http://udt-java.svn.sourceforge.net/udt-java/?rev=70&view=rev
Author: bschuller
Date: 2012-02-03 09:09:37 +0000 (Fri, 03 Feb 2012)
Log Message:
-----------
roll back change to FlowWindow; add some comments to make it easier to understand
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
Modified: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -6,10 +6,12 @@
/**
*
- * holds a fixed number of {@link DataPacket} instances which are sent out.
+ * Holds a fixed number of {@link DataPacket} instances which are sent out.<br/>
*
- * it is assumed that a single thread stores new data, and another single thread
- * reads/removes data
+ * it is assumed that a single thread (the producer) stores new data,
+ * and another single thread (the consumer) reads/removes data.<br/>
+ *
+ *
*
* @author schuller
*/
@@ -23,12 +25,15 @@
private volatile boolean isFull=false;
+ //valid entries that can be read
private volatile int validEntries=0;
private volatile boolean isCheckout=false;
+ //index where the next data packet will be written to
private volatile int writePos=0;
+ //one before the index where the next data packet will be read from
private volatile int readPos=-1;
private volatile int consumed=0;
@@ -42,7 +47,7 @@
* @param chunksize - data chunk size
*/
public FlowWindow(int size, int chunksize){
- this.length=size;
+ this.length=size+1;
packets=new DataPacket[length];
for(int i=0;i<packets.length;i++){
packets[i]=new DataPacket();
@@ -64,21 +69,25 @@
}
if(isCheckout)throw new IllegalStateException();
isCheckout=true;
- DataPacket p=packets[writePos];
- return p;
+ return packets[writePos];
}finally{
lock.unlock();
}
}
+ /**
+ * notify the flow window that the data packet obtained by {@link #getForProducer()}
+ * has been filled with data and is ready for sending out
+ */
public void produce(){
lock.lock();
try{
+ if(!isCheckout)throw new IllegalStateException();
isCheckout=false;
writePos++;
if(writePos==length)writePos=0;
validEntries++;
- isFull=validEntries==length;
+ isFull=validEntries==length-1;
isEmpty=false;
produced++;
}finally{
@@ -88,11 +97,11 @@
public DataPacket consumeData(){
- if(isEmpty){
- return null;
- }
lock.lock();
try{
+ if(isEmpty){
+ return null;
+ }
readPos++;
DataPacket p=packets[readPos];
if(readPos==length-1)readPos=-1;
@@ -133,6 +142,7 @@
StringBuilder sb=new StringBuilder();
sb.append("FlowWindow size=").append(length);
sb.append(" full=").append(isFull).append(" empty=").append(isEmpty);
+ sb.append(" readPos=").append(readPos).append(" writePos=").append(writePos);
sb.append(" consumed=").append(consumed).append(" produced=").append(produced);
return sb.toString();
}
Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -25,9 +25,9 @@
@Test
public void testWithoutLoss()throws Exception{
- Logger.getLogger("udt").setLevel(Level.WARNING);
+ Logger.getLogger("udt").setLevel(Level.INFO);
UDTReceiver.dropRate=0;
- num_packets=640;
+ num_packets=1000;
TIMEOUT=Integer.MAX_VALUE;
doTest();
}
@@ -37,9 +37,9 @@
public void testWithLoss()throws Exception{
UDTReceiver.dropRate=3;
TIMEOUT=Integer.MAX_VALUE;
- num_packets=512;
+ num_packets=100;
//set log level
- Logger.getLogger("udt").setLevel(Level.WARNING);
+ Logger.getLogger("udt").setLevel(Level.INFO);
doTest();
}
@@ -48,9 +48,9 @@
public void testLargeDataSet()throws Exception{
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- num_packets=3*1024;
+ num_packets=100;
//set log level
- Logger.getLogger("udt").setLevel(Level.WARNING);
+ Logger.getLogger("udt").setLevel(Level.INFO);
doTest();
}
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -6,6 +6,7 @@
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -18,7 +19,7 @@
*/
public class UDPTest {
- final int num_packets=10*10*1000;
+ final int num_packets=1000;
final int packetSize=UDPEndPoint.DATAGRAM_SIZE;
@Test
@@ -48,6 +49,7 @@
dgSendInterval.end();
dgSendTime.begin();
s.send(dp);
+ Thread.sleep(5);
dgSendTime.end();
dgSendInterval.begin();
}
@@ -76,11 +78,10 @@
public void run(){
try{
byte[]buf=new byte[packetSize];
- DatagramPacket dp=new DatagramPacket(buf,buf.length);
while(true){
+ DatagramPacket dp=new DatagramPacket(buf,buf.length);
serverSocket.receive(dp);
handoff.offer(dp);
- total+=dp.getLength();
}
}
catch(Exception e){
@@ -91,6 +92,7 @@
};
Thread t=new Thread(serverProcess);
t.start();
+ System.out.println("Server started.");
}
private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>();
@@ -99,11 +101,15 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
+ int counter=0;
long start=System.currentTimeMillis();
- while(true){
- DatagramPacket dp=handoff.poll();
- if(dp!=null)total+=dp.getLength();
- if(total==N)break;
+ while(counter<num_packets){
+ DatagramPacket dp=handoff.poll(10, TimeUnit.MILLISECONDS);
+ if(dp!=null){
+ total+=dp.getLength();
+ counter++;
+ System.out.println("Count: "+counter);
+ }
}
long end=System.currentTimeMillis();
System.out.println("Server time: "+(end-start)+" ms.");
@@ -117,6 +123,7 @@
};
Thread t=new Thread(serverProcess);
t.start();
+ System.out.println("Hand-off thread started.");
}
Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
===================================================================
--- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -33,6 +33,7 @@
DataPacket no=fw.getForProducer();
assertNull("Window should be full",no);
+ assertTrue(fw.isFull());
DataPacket c1=fw.consumeData();
//must be p1
@@ -73,14 +74,21 @@
DataPacket p4=fw.getForProducer();
assertNotNull(p4);
fw.produce();
+ fw.consumeData();
+
+ DataPacket p5=fw.getForProducer();
+ assertNotNull(p5);
+ fw.produce();
+
//which is again p1
- assertTrue(p4==p1);
+ assertTrue(p5==p1);
}
private volatile boolean fail=false;
- public void testConcurrentReadWrite()throws InterruptedException{
+ @Test
+ public void testConcurrentReadWrite_20()throws InterruptedException{
final FlowWindow fw=new FlowWindow(20, 64);
Thread reader=new Thread(new Runnable(){
public void run(){
@@ -107,6 +115,34 @@
}
+ @Test
+ public void testConcurrentReadWrite_2()throws InterruptedException{
+ final FlowWindow fw=new FlowWindow(2, 64);
+ Thread reader=new Thread(new Runnable(){
+ public void run(){
+ doRead(fw);
+ }
+ });
+ reader.setName("reader");
+ Thread writer=new Thread(new Runnable(){
+ public void run(){
+ doWrite(fw);
+ }
+ });
+ writer.setName("writer");
+
+ writer.start();
+ reader.start();
+
+ int c=0;
+ while(read && write && c<10){
+ Thread.sleep(1000);
+ c++;
+ }
+ assertFalse("An error occured in reader or writer",fail);
+
+ }
+
volatile boolean read=true;
volatile boolean write=true;
int N=100000;
@@ -123,6 +159,7 @@
}
}catch(Throwable ex){
ex.printStackTrace();
+ System.out.println(fw);
fail=true;
}
System.out.println("Exiting reader...");
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|