[Udt-java-commits] SF.net SVN: udt-java:[53] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2011-01-06 16:13:38
|
Revision: 53 http://udt-java.svn.sourceforge.net/udt-java/?rev=53&view=rev Author: bschuller Date: 2011-01-06 16:13:32 +0000 (Thu, 06 Jan 2011) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/MeanValue.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53) @@ -117,12 +117,17 @@ if(localPort>0)this.port = localPort; else port=dgSocket.getLocalPort(); + configureSocket(); + } + + protected void configureSocket()throws SocketException{ //set a time out to avoid blocking in doReceive() dgSocket.setSoTimeout(100000); //buffer size dgSocket.setReceiveBufferSize(128*1024); + dgSocket.setReuseAddress(false); } - + /** * bind to the default network interface on the machine * @@ -237,8 +242,6 @@ private long lastDestID=-1; private UDTSession lastSession; - //MeanValue v=new MeanValue("receiver processing ",true, 256); - private int n=0; private final Object lock=new Object(); @@ -247,13 +250,10 @@ while(!stopped){ try{ try{ - //v.end(); //will block until a packet is received or timeout has expired dgSocket.receive(dp); - //v.begin(); - Destination peer=new Destination(dp.getAddress(), dp.getPort()); int l=dp.getLength(); UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-01-06 16:13:32 UTC (rev 53) @@ -93,14 +93,19 @@ /** * sends the given data asynchronously * - * @param data + * @param data - the data to send * @throws IOException - * @throws InterruptedException */ - public void send(byte[]data)throws IOException, InterruptedException{ + public void send(byte[]data)throws IOException{ clientSession.getSocket().doWrite(data); } + /** + * sends the given data and waits for acknowledgement + * @param data - the data to send + * @throws IOException + * @throws InterruptedException if interrupted while waiting for ack + */ public void sendBlocking(byte[]data)throws IOException, InterruptedException{ clientSession.getSocket().doWriteBlocking(data); } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53) @@ -183,13 +183,13 @@ private MeanValue dataProcessTime; private void initMetrics(){ if(!storeStatistics)return; - dgReceiveInterval=new MeanValue("UDT receive interval"); + dgReceiveInterval=new MeanValue("RECEIVER: UDT receive interval"); statistics.addMetric(dgReceiveInterval); - dataPacketInterval=new MeanValue("Data packet interval"); + dataPacketInterval=new MeanValue("RECEIVER: Data packet interval"); statistics.addMetric(dataPacketInterval); - processTime=new MeanValue("UDT packet process time"); + processTime=new MeanValue("RECEIVER: UDT packet process time"); statistics.addMetric(processTime); - dataProcessTime=new MeanValue("Data packet process time"); + dataProcessTime=new MeanValue("RECEIVER: Data packet process time"); statistics.addMetric(dataProcessTime); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53) @@ -125,7 +125,7 @@ statistics=session.getStatistics(); senderLossList=new SenderLossList(); sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2); - sendQueue = new ArrayBlockingQueue<DataPacket>(1000); + sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true); lastAckSequenceNumber=session.getInitialSequenceNumber(); currentSequenceNumber=session.getInitialSequenceNumber()-1; waitForAckLatch.set(new CountDownLatch(1)); @@ -140,11 +140,11 @@ private MeanThroughput throughput; private void initMetrics(){ if(!storeStatistics)return; - dgSendTime=new MeanValue("Datagram send time"); + dgSendTime=new MeanValue("SENDER: Datagram send time"); statistics.addMetric(dgSendTime); - dgSendInterval=new MeanValue("Datagram send interval"); + dgSendInterval=new MeanValue("SENDER: Datagram send interval"); statistics.addMetric(dgSendInterval); - throughput=new MeanThroughput("Throughput", session.getDatagramSize()); + throughput=new MeanThroughput("SENDER: Throughput", session.getDatagramSize()); statistics.addMetric(throughput); } @@ -338,7 +338,7 @@ if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() && unAcknowledged<session.getFlowWindowSize()){ //check for application data - DataPacket dp=sendQueue.poll(Util.SYN,TimeUnit.MICROSECONDS); + DataPacket dp=sendQueue.poll(); if(dp!=null){ send(dp); largestSentSequenceNumber=dp.getPacketSequenceNumber(); Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53) @@ -33,6 +33,8 @@ package udt.sender; import java.util.LinkedList; +import udt.util.MeanValue; + /** * stores the sequence number of the lost packets in increasing order */ @@ -49,15 +51,15 @@ public void insert(Long obj){ synchronized (backingList) { - if(!backingList.contains(obj)){ - for(int i=0;i<backingList.size();i++){ - if(obj<backingList.get(i)){ - backingList.add(i,obj); - return; - } + for(int i=0;i<backingList.size();i++){ + Long entry=backingList.get(i); + if(obj<entry){ + backingList.add(i,obj); + return; } - backingList.add(obj); + else if(obj==entry)return; } + backingList.add(obj); } } @@ -69,7 +71,7 @@ return backingList.poll(); } } - + public boolean isEmpty(){ return backingList.isEmpty(); } Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2011-01-06 16:13:32 UTC (rev 53) @@ -9,6 +9,8 @@ public class MeanValue { private double mean=0; + private double max=0; + private double min=0; private int n=0; @@ -43,9 +45,15 @@ public void addValue(double value){ mean=(mean*n+value)/(n+1); n++; + max=Math.max(max, value); + min=Math.min(max, value); + if(verbose && n % nValue == 0){ - if(msg!=null)System.out.println(msg+" "+getFormattedMean()); - else System.out.println(name+getFormattedMean()); + if(msg!=null)System.out.println(msg+" "+get()); + else System.out.println(name+" "+get()); + + max=0; + min=0; } } @@ -57,6 +65,10 @@ return format.format(getMean()); } + public String get(){ + return format.format(getMean())+" max="+format.format(max)+" min="+format.format(min); + } + public void clear(){ mean=0; n=0; Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-01-06 16:13:32 UTC (rev 53) @@ -44,7 +44,6 @@ lock=new ReentrantLock(false); notEmpty=lock.newCondition(); highestReadSequenceNumber=SequenceNumber.decrement(initialSequenceNumber); - System.out.println("SIZE: "+size); } public boolean offer(AppData data){ @@ -121,14 +120,6 @@ } else return null; } - // else{ - // System.out.println("empty HEAD at pos="+readPosition); - // try{ - // Thread.sleep(1000); - // Thread.yield(); - // }catch(InterruptedException e){}; - // } - return r; } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53) @@ -104,6 +104,12 @@ } long size=decode(sizeInfo, 0); + Boolean devNull=Boolean.getBoolean("udt.dev.null"); + if(devNull){ + while(true)Thread.sleep(10000); + } + + File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=200; + int num_packets=300; //how large is a single packet int size=1*1024*1024; @@ -32,6 +32,8 @@ public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); +// System.setProperty("udt.receiver.storeStatistics","true"); +// System.setProperty("udt.sender.storeStatistics","true"); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; doTest(); @@ -109,6 +111,10 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ + Boolean devNull=Boolean.getBoolean("udt.dev.null"); + if(devNull){ + while(true)Thread.sleep(10000); + } MessageDigest md5=MessageDigest.getInstance("MD5"); long start=System.currentTimeMillis(); UDTSocket s=serverSocket.accept(); Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-01-06 16:13:32 UTC (rev 53) @@ -23,8 +23,10 @@ public void test1()throws Exception{ runServer(); runThirdThread(); + //client socket DatagramSocket s=new DatagramSocket(12345); + //generate a test array with random content N=num_packets*packetSize; byte[]data=new byte[packetSize]; @@ -34,32 +36,29 @@ dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); - MeanValue v=new MeanValue("Datagram send time",false); - MeanValue v2=new MeanValue("Datagram send interval",false); - MeanValue v3=new MeanValue("Encoding time",false); + MeanValue dgSendTime=new MeanValue("Datagram send time",false); + MeanValue dgSendInterval=new MeanValue("Datagram send interval",false); for(int i=0;i<num_packets;i++){ DataPacket p=new DataPacket(); p.setData(data); - v3.begin(); dp.setData(p.getEncoded()); - v3.end(); - v2.end(); - v.begin(); + dgSendInterval.end(); + dgSendTime.begin(); s.send(dp); - v.end(); - v2.begin(); + dgSendTime.end(); + dgSendInterval.begin(); } System.out.println("Finished sending."); while(serverRunning)Thread.sleep(10); System.out.println("Server stopped."); long end=System.currentTimeMillis(); System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); - System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec"); + float rate=N/1000/(end-start); + System.out.println("Rate "+rate+" Mbytes/sec "+(rate*8)+ " Mbit/sec"); System.out.println("Rate "+num_packets+" packets/sec"); - System.out.println("Mean send time "+v.getFormattedMean()+" microsec"); - System.out.println("Mean send interval "+v2.getFormattedMean()+" microsec"); - System.out.println("Datapacket encoding time "+v3.getFormattedMean()+" microsec"); + System.out.println("Mean send time "+dgSendTime.get()); + System.out.println("Mean send interval "+dgSendInterval.get()); System.out.println("Server received: "+total); } @@ -79,6 +78,7 @@ while(true){ serverSocket.receive(dp); handoff.offer(dp); + total+=dp.getLength(); } } catch(Exception e){ @@ -117,5 +117,5 @@ t.start(); } - + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |