[Udt-java-commits] SF.net SVN: udt-java:[50] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-09-17 11:04:46
|
Revision: 50
http://udt-java.svn.sourceforge.net/udt-java/?rev=50&view=rev
Author: bschuller
Date: 2010-09-17 11:04:39 +0000 (Fri, 17 Sep 2010)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/Util.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -386,14 +386,14 @@
protected void onDataPacketReceived(DataPacket dp)throws IOException{
long currentSequenceNumber = dp.getPacketSequenceNumber();
- //check whether to drop this packet
+ //for TESTING : check whether to drop this packet
// n++;
// //if(dropRate>0 && n % dropRate == 0){
-// if(n==666){
-// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
-// return;
-// }
-//
+// if(n % 1111 == 0){
+// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
+// return;
+// }
+// //}
boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
if(!OK){
//need to drop packet...
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -61,6 +61,7 @@
int insert=offset% size;
buffer[insert]=data;
numValidChunks.incrementAndGet();
+ notEmpty.signal();
return true;
}finally{
lock.unlock();
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -32,8 +32,10 @@
package udt.util;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.text.NumberFormat;
@@ -101,22 +103,16 @@
total+=r;
}
long size=decode(sizeInfo, 0);
- if(verbose){
- StringBuilder sb=new StringBuilder();
- for(int i=0;i<sizeInfo.length;i++){
- sb.append(Integer.toString(sizeInfo[i]));
- sb.append(" ");
- }
- System.out.println("[ReceiveFile] Size info: "+sb.toString());
- }
+
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
+ OutputStream os=new BufferedOutputStream(fos,1024*1024);
try{
System.out.println("[ReceiveFile] Reading <"+size+"> bytes.");
long start = System.currentTimeMillis();
//and read the file data
- Util.copy(in, fos, size, false);
+ Util.copy(in, os, size, false);
long end = System.currentTimeMillis();
double rate=1000.0*size/1024/1024/(end-start);
System.out.println("[ReceiveFile] Rate: "+format.format(rate)+" MBytes/sec. "
Modified: udt-java/trunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -134,6 +134,7 @@
c=source.read(buf);
if(c<0)break;
read+=c;
+ //System.out.println("writing <"+c+"> bytes");
target.write(buf, 0, c);
if(flush)target.flush();
if(read>=size && size>-1)break;
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=500;
+ int num_packets=200;
//how large is a single packet
int size=1*1024*1024;
Modified: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -16,79 +16,79 @@
byte[]test1="test1".getBytes();
byte[]test2="test2".getBytes();
byte[]test3="test3".getBytes();
-
+
b.offer(new AppData(1l,test1));
b.offer(new AppData(2l,test2));
b.offer(new AppData(3l,test3));
-
+
AppData a=b.poll();
assertEquals(1l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(2l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(3l,a.getSequenceNumber());
-
+
assertNull(b.poll());
}
-
+
public void testOutOfOrder(){
ReceiveBuffer b=new ReceiveBuffer(16,1);
byte[]test1="test1".getBytes();
byte[]test2="test2".getBytes();
byte[]test3="test3".getBytes();
-
+
b.offer(new AppData(3l,test3));
b.offer(new AppData(2l,test2));
b.offer(new AppData(1l,test1));
-
+
AppData a=b.poll();
assertEquals(1l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(2l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(3l,a.getSequenceNumber());
-
+
assertNull(b.poll());
}
-
+
public void testInterleaved(){
ReceiveBuffer b=new ReceiveBuffer(16,1);
byte[]test1="test1".getBytes();
byte[]test2="test2".getBytes();
byte[]test3="test3".getBytes();
-
+
b.offer(new AppData(3l,test3));
-
+
b.offer(new AppData(1l,test1));
-
+
AppData a=b.poll();
assertEquals(1l,a.getSequenceNumber());
-
+
assertNull(b.poll());
-
+
b.offer(new AppData(2l,test2));
-
+
a=b.poll();
assertEquals(2l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(3l,a.getSequenceNumber());
}
-
+
public void testOverflow(){
ReceiveBuffer b=new ReceiveBuffer(4,1);
-
+
for(int i=0; i<3; i++){
b.offer(new AppData(i+1,"test".getBytes()));
}
for(int i=0; i<3; i++){
assertEquals(i+1, b.poll().getSequenceNumber());
}
-
+
for(int i=0; i<3; i++){
b.offer(new AppData(i+4,"test".getBytes()));
}
@@ -96,13 +96,13 @@
assertEquals(i+4, b.poll().getSequenceNumber());
}
}
-
-
+
+
public void testTimedPoll()throws Exception{
final ReceiveBuffer b=new ReceiveBuffer(4,1);
-
+
Runnable write=new Runnable(){
-
+
public void run(){
try{
for(int i=0; i<5; i++){
@@ -115,7 +115,7 @@
}
}
};
-
+
Callable<String> reader=new Callable<String>(){
public String call() throws Exception {
for(int i=0; i<5; i++){
@@ -131,12 +131,60 @@
return "OK.";
}
};
+
+ ScheduledExecutorService es=Executors.newScheduledThreadPool(2);
+ es.execute(write);
+ Future<String>res=es.submit(reader);
+ res.get();
+ es.shutdownNow();
+ }
+
+
+ volatile boolean poll=false;
+
+ public void testTimedPoll2()throws Exception{
+ final ReceiveBuffer b=new ReceiveBuffer(4,1);
+ Runnable write=new Runnable(){
+
+ public void run(){
+ try{
+ Thread.sleep(2979);
+ System.out.println("PUT");
+ while(!poll)Thread.sleep(10);
+ b.offer(new AppData(1,"test".getBytes()));
+ System.out.println("... PUT OK");
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail();
+ }
+ }
+ };
+
+ Callable<String> reader=new Callable<String>(){
+ public String call() throws Exception {
+ AppData r=null;
+ do{
+ try{
+ poll=true;
+ System.out.println("POLL");
+ r=b.poll(1000, TimeUnit.MILLISECONDS);
+ poll=false;
+ if(r!=null)System.out.println("... POLL OK");
+ else System.out.println("... nothing.");
+ }catch(InterruptedException ie){
+ ie.printStackTrace();
+ }
+ }while(r==null);
+ return "OK.";
+ }
+ };
+
ScheduledExecutorService es=Executors.newScheduledThreadPool(2);
es.execute(write);
Future<String>res=es.submit(reader);
res.get();
es.shutdownNow();
}
-
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|