[Udt-java-commits] SF.net SVN: udt-java:[50] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-09-17 11:04:46
|
Revision: 50 http://udt-java.svn.sourceforge.net/udt-java/?rev=50&view=rev Author: bschuller Date: 2010-09-17 11:04:39 +0000 (Fri, 17 Sep 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-17 11:04:39 UTC (rev 50) @@ -386,14 +386,14 @@ protected void onDataPacketReceived(DataPacket dp)throws IOException{ long currentSequenceNumber = dp.getPacketSequenceNumber(); - //check whether to drop this packet + //for TESTING : check whether to drop this packet // n++; // //if(dropRate>0 && n % dropRate == 0){ -// if(n==666){ -// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); -// return; -// } -// +// if(n % 1111 == 0){ +// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); +// return; +// } +// //} boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); if(!OK){ //need to drop packet... Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50) @@ -61,6 +61,7 @@ int insert=offset% size; buffer[insert]=data; numValidChunks.incrementAndGet(); + notEmpty.signal(); return true; }finally{ lock.unlock(); Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-17 11:04:39 UTC (rev 50) @@ -32,8 +32,10 @@ package udt.util; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.text.NumberFormat; @@ -101,22 +103,16 @@ total+=r; } long size=decode(sizeInfo, 0); - if(verbose){ - StringBuilder sb=new StringBuilder(); - for(int i=0;i<sizeInfo.length;i++){ - sb.append(Integer.toString(sizeInfo[i])); - sb.append(" "); - } - System.out.println("[ReceiveFile] Size info: "+sb.toString()); - } + File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); + OutputStream os=new BufferedOutputStream(fos,1024*1024); try{ System.out.println("[ReceiveFile] Reading <"+size+"> bytes."); long start = System.currentTimeMillis(); //and read the file data - Util.copy(in, fos, size, false); + Util.copy(in, os, size, false); long end = System.currentTimeMillis(); double rate=1000.0*size/1024/1024/(end-start); System.out.println("[ReceiveFile] Rate: "+format.format(rate)+" MBytes/sec. " Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-17 11:04:39 UTC (rev 50) @@ -134,6 +134,7 @@ c=source.read(buf); if(c<0)break; read+=c; + //System.out.println("writing <"+c+"> bytes"); target.write(buf, 0, c); if(flush)target.flush(); if(read>=size && size>-1)break; Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-17 11:04:39 UTC (rev 50) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=500; + int num_packets=200; //how large is a single packet int size=1*1024*1024; Modified: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java =================================================================== --- udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50) @@ -16,79 +16,79 @@ byte[]test1="test1".getBytes(); byte[]test2="test2".getBytes(); byte[]test3="test3".getBytes(); - + b.offer(new AppData(1l,test1)); b.offer(new AppData(2l,test2)); b.offer(new AppData(3l,test3)); - + AppData a=b.poll(); assertEquals(1l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(2l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(3l,a.getSequenceNumber()); - + assertNull(b.poll()); } - + public void testOutOfOrder(){ ReceiveBuffer b=new ReceiveBuffer(16,1); byte[]test1="test1".getBytes(); byte[]test2="test2".getBytes(); byte[]test3="test3".getBytes(); - + b.offer(new AppData(3l,test3)); b.offer(new AppData(2l,test2)); b.offer(new AppData(1l,test1)); - + AppData a=b.poll(); assertEquals(1l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(2l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(3l,a.getSequenceNumber()); - + assertNull(b.poll()); } - + public void testInterleaved(){ ReceiveBuffer b=new ReceiveBuffer(16,1); byte[]test1="test1".getBytes(); byte[]test2="test2".getBytes(); byte[]test3="test3".getBytes(); - + b.offer(new AppData(3l,test3)); - + b.offer(new AppData(1l,test1)); - + AppData a=b.poll(); assertEquals(1l,a.getSequenceNumber()); - + assertNull(b.poll()); - + b.offer(new AppData(2l,test2)); - + a=b.poll(); assertEquals(2l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(3l,a.getSequenceNumber()); } - + public void testOverflow(){ ReceiveBuffer b=new ReceiveBuffer(4,1); - + for(int i=0; i<3; i++){ b.offer(new AppData(i+1,"test".getBytes())); } for(int i=0; i<3; i++){ assertEquals(i+1, b.poll().getSequenceNumber()); } - + for(int i=0; i<3; i++){ b.offer(new AppData(i+4,"test".getBytes())); } @@ -96,13 +96,13 @@ assertEquals(i+4, b.poll().getSequenceNumber()); } } - - + + public void testTimedPoll()throws Exception{ final ReceiveBuffer b=new ReceiveBuffer(4,1); - + Runnable write=new Runnable(){ - + public void run(){ try{ for(int i=0; i<5; i++){ @@ -115,7 +115,7 @@ } } }; - + Callable<String> reader=new Callable<String>(){ public String call() throws Exception { for(int i=0; i<5; i++){ @@ -131,12 +131,60 @@ return "OK."; } }; + + ScheduledExecutorService es=Executors.newScheduledThreadPool(2); + es.execute(write); + Future<String>res=es.submit(reader); + res.get(); + es.shutdownNow(); + } + + + volatile boolean poll=false; + + public void testTimedPoll2()throws Exception{ + final ReceiveBuffer b=new ReceiveBuffer(4,1); + Runnable write=new Runnable(){ + + public void run(){ + try{ + Thread.sleep(2979); + System.out.println("PUT"); + while(!poll)Thread.sleep(10); + b.offer(new AppData(1,"test".getBytes())); + System.out.println("... PUT OK"); + } + catch(Exception e){ + e.printStackTrace(); + fail(); + } + } + }; + + Callable<String> reader=new Callable<String>(){ + public String call() throws Exception { + AppData r=null; + do{ + try{ + poll=true; + System.out.println("POLL"); + r=b.poll(1000, TimeUnit.MILLISECONDS); + poll=false; + if(r!=null)System.out.println("... POLL OK"); + else System.out.println("... nothing."); + }catch(InterruptedException ie){ + ie.printStackTrace(); + } + }while(r==null); + return "OK."; + } + }; + ScheduledExecutorService es=Executors.newScheduledThreadPool(2); es.execute(write); Future<String>res=es.submit(reader); res.get(); es.shutdownNow(); } - } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |