[Udt-java-commits] SF.net SVN: udt-java:[24] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-25 20:32:29
|
Revision: 24 http://udt-java.svn.sourceforge.net/udt-java/?rev=24&view=rev Author: bschuller Date: 2010-04-25 20:32:22 +0000 (Sun, 25 Apr 2010) Log Message: ----------- simpler InputStream; fix wrong value for EXP interval Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/UDTTestBase.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/trunk/src/main/java/udt/util/MeanValue.java Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-25 20:32:22 UTC (rev 24) @@ -34,7 +34,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,13 +65,10 @@ //see the noMoreData() method private final AtomicBoolean expectMoreData=new AtomicBoolean(true); - - private final ByteBuffer buffer; - private volatile boolean closed=false; - + private volatile boolean blocking=true; - + /** * create a new {@link UDTInputStream} connected to the given socket * @param socket - the {@link UDTSocket} @@ -83,8 +79,6 @@ this.socket=socket; this.statistics=statistics; appData=new FlowWindow<AppData>(getFlowWindowSize()); - buffer=ByteBuffer.allocate(65536); - buffer.flip(); } private int getFlowWindowSize(){ @@ -107,7 +101,7 @@ int b=0; while(b==0) b=read(single); - + if(b>0){ return single[0]; } @@ -115,24 +109,39 @@ return b; } } - + private AppData currentChunk=null; + //offset into currentChunk int offset=0; + @Override public int read(byte[]target)throws IOException{ try{ - //empty the buffer first - int read=readFromBuffer(target, 0); - //if no more space left in target, exit now - if(read==target.length){ - return target.length; + int read=0; + updateCurrentChunk(); + while(currentChunk!=null){ + byte[]data=currentChunk.data; + int length=Math.min(target.length-read,data.length-offset); + System.arraycopy(data, offset, target, read, length); + read+=length; + offset+=length; + //check if chunk has been fully read + if(offset>=data.length){ + currentChunk=null; + offset=0; + } + + //if no more space left in target, exit now + if(read==target.length){ + return read; + } + + updateCurrentChunk(); } - //otherwise try to fill up the buffer - fillBuffer(); - read+=readFromBuffer(target, read); + if(read>0)return read; if(closed)return -1; - if(expectMoreData.get() || buffer.remaining()>0 || !appData.isEmpty())return 0; + if(expectMoreData.get() || !appData.isEmpty())return 0; //no more data return -1; @@ -143,20 +152,18 @@ } } - @Override - public int available()throws IOException{ - return buffer.remaining(); - } - /** - * write as much data into the ByteBuffer as possible<br/> + * Reads the next valid chunk of application data from the queue<br/> + * * In blocking mode,this method will block until data is available or the socket is closed, - * otherwise wait for at most 10 milliseconds. - * @returns <code>true</code> if data available + * otherwise it will wait for at most 10 milliseconds. + * * @throws InterruptedException */ - private boolean fillBuffer()throws IOException{ - if(currentChunk==null){ + private void updateCurrentChunk()throws IOException{ + if(currentChunk!=null)return; + + while(true){ try{ if(blocking){ currentChunk=appData.poll(1, TimeUnit.MILLISECONDS); @@ -170,51 +177,29 @@ ex.initCause(ie); throw ex; } - } - if(currentChunk!=null){ - //check if the data is in-order - if(currentChunk.sequenceNumber==highestSequenceNumber+1){ - highestSequenceNumber++; - statistics.updateReadDataMD5(currentChunk.data); + if(currentChunk!=null){ + //check if the data is in-order + if(currentChunk.sequenceNumber==highestSequenceNumber+1){ + highestSequenceNumber++; + statistics.updateReadDataMD5(currentChunk.data); + return; + } + else if(currentChunk.sequenceNumber<=highestSequenceNumber){ + //duplicate, drop it + currentChunk=null; + statistics.incNumberOfDuplicateDataPackets(); + } + else{ + //out of order data, put back into queue and exit + appData.offer(currentChunk); + currentChunk=null; + return; + } } - else if(currentChunk.sequenceNumber<=highestSequenceNumber){ - //duplicate, drop it - currentChunk=null; - statistics.incNumberOfDuplicateDataPackets(); - return false; - } - else{ - //out of order data, put back into queue - appData.offer(currentChunk); - currentChunk=null; - return false; - } - - //fill data into the buffer - buffer.compact(); - int len=Math.min(buffer.remaining(),currentChunk.data.length-offset); - buffer.put(currentChunk.data,offset,len); - buffer.flip(); - offset+=len; - //check if the chunk has been fully read - if(offset>=currentChunk.data.length){ - currentChunk=null; - offset=0; - } + else return; } - return true; } - //read data from the internal buffer into target at the specified offset - private int readFromBuffer(byte[] target, int offset){ - int available=buffer.remaining(); - int canRead=Math.min(available, target.length-offset); - if(canRead>0){ - buffer.get(target, offset, canRead); - } - return canRead; - } - /** * new application data * @param data @@ -231,7 +216,7 @@ closed=true; noMoreData(); } - + public UDTSocket getSocket(){ return socket; } @@ -243,7 +228,7 @@ public void setBlocking(boolean block){ this.blocking=block; } - + /** * notify the input stream that there is no more data * @throws IOException @@ -277,7 +262,7 @@ final int prime = 31; int result = 1; result = prime * result - + (int) (sequenceNumber ^ (sequenceNumber >>> 32)); + + (int) (sequenceNumber ^ (sequenceNumber >>> 32)); return result; } @@ -294,8 +279,8 @@ return false; return true; } - - + + } } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-25 20:32:22 UTC (rev 24) @@ -125,7 +125,7 @@ private long nextEXP; //microseconds to next EXP event - private long EXP_INTERVAL=Util.getSYNTime(); + private long EXP_INTERVAL=100*Util.getSYNTime(); //instant when the session was created (for expiry checking) private final long sessionUpSince; @@ -348,7 +348,7 @@ public static int dropRate=0; //number of received data packets private int n=0; - + protected void onDataPacketReceived(DataPacket dp)throws IOException{ long currentSequenceNumber = dp.getPacketSequenceNumber(); //check whether to drop this packet @@ -367,9 +367,11 @@ long interval=currentDataPacketArrivalTime -lastDataPacketArrivalTime; packetPairWindow.add(interval); } + //(5).record the packet arrival time in the PKT History Window. packetHistoryWindow.add(currentDataPacketArrivalTime); + //store current time lastDataPacketArrivalTime=currentDataPacketArrivalTime; Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-25 20:32:22 UTC (rev 24) @@ -286,14 +286,12 @@ SenderLossListEntry entry=senderLossList.getFirstEntry(); if (entry!=null) { long seqNumber = entry.getSequenceNumber(); - //TODO //if the current seqNumber is 16n,check the timeOut in the //loss list and send a message drop request. //if((seqNumber%16)==0){ //sendLossList.checkTimeOut(timeToLive); //} - try { //retransmit the packet with the first entry in the list //as sequence number and remove it from the list @@ -301,8 +299,7 @@ if(pktToRetransmit!=null){ endpoint.doSend(pktToRetransmit); statistics.incNumberOfRetransmittedDataPackets(); - } - senderLossList.remove(seqNumber); + } }catch (Exception e) { logger.log(Level.WARNING,"",e); } @@ -352,6 +349,7 @@ * for processing EXP event (see spec. p 13) */ protected void putUnacknowledgedPacketsIntoLossList(){ + synchronized (sendLock) { for(Long l: sendBuffer.keySet()){ senderLossList.insert(new SenderLossListEntry(l)); Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-25 20:32:22 UTC (rev 24) @@ -58,7 +58,8 @@ } /** - * gets the loss list entry with the lowest sequence number + * retrieves the loss list entry with the lowest sequence number and removes + * it from the loss list */ public SenderLossListEntry getFirstEntry(){ return backingList.poll(); @@ -68,4 +69,7 @@ return backingList.isEmpty(); } + public long size(){ + return backingList.size(); + } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-25 20:32:22 UTC (rev 24) @@ -94,4 +94,7 @@ return true; } + public String toString(){ + return "lossListEntry-"+sequenceNumber; + } } Added: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java (rev 0) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-25 20:32:22 UTC (rev 24) @@ -0,0 +1,38 @@ +package udt.util; + +import java.text.NumberFormat; + +/** + * holds a floating mean value + */ +public class MeanValue { + + private double mean=0; + + private int n=0; + + private final NumberFormat format; + + + public MeanValue(){ + format=NumberFormat.getNumberInstance(); + format.setMaximumFractionDigits(2); + } + public void addValue(double value){ + mean=(mean*n+value)/(n+1); + n++; + } + + public double getMean(){ + return mean; + } + + public String getFormattedMean(){ + return format.format(mean); + } + + public void clear(){ + mean=0; + n=0; + } +} Property changes on: udt-java/trunk/src/main/java/udt/util/MeanValue.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-25 20:32:22 UTC (rev 24) @@ -150,6 +150,10 @@ this.sendPeriod=sendPeriod; } + public double getSendPeriod(){ + return sendPeriod; + } + public void updateReadDataMD5(byte[]data){ digest.update(data); } Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2010-04-25 20:32:22 UTC (rev 24) @@ -13,30 +13,46 @@ byte[] data2="a test".getBytes(); byte[] data3=" string".getBytes(); String digest=computeMD5(data1,data2,data3); - is.haveNewData(0, data1); - is.haveNewData(1, data2); - is.haveNewData(2, data3); + is.haveNewData(1, data1); + is.haveNewData(2, data2); + is.haveNewData(3, data3); is.noMoreData(); is.setBlocking(false); readAll(is,8); assertEquals(digest,stat.getDigest()); } + public void test2()throws Exception{ + UDTStatistics stat=new UDTStatistics("test"); + UDTInputStream is=new UDTInputStream(null, stat); + byte[] data1=getRandomData(65537); + byte[] data2=getRandomData(1234); + byte[] data3=getRandomData(3*1024*1024); + String digest=computeMD5(data1,data2,data3); + is.setBlocking(false); + is.haveNewData(1, data1); + is.haveNewData(2, data2); + is.haveNewData(3, data3); + is.noMoreData(); + readAll(is,5*1024*1024); + assertEquals(digest,stat.getDigest()); + } + public void testInOrder()throws Exception{ UDTStatistics stat=new UDTStatistics("test"); UDTInputStream is=new UDTInputStream(null, stat); is.setBlocking(false); - byte[]data=getRandomData(10*1024); + byte[]data=getRandomData(10*1024*1024); byte[][]blocks=makeChunks(10,data); String digest=computeMD5(blocks); for(int i=0;i<10;i++){ - is.haveNewData(i, blocks[i]); + is.haveNewData(i+1, blocks[i]); } is.noMoreData(); - readAll(is,512); + readAll(is,1024*999); assertEquals(digest,stat.getDigest()); } @@ -52,7 +68,7 @@ byte[]order=new byte[]{9,7,5,3,1,2,0,4,6,8}; for(int i : order){ - is.haveNewData(i, blocks[i]); + is.haveNewData(i+1, blocks[i]); } readAll(is,512,true); @@ -70,7 +86,7 @@ is.noMoreData(); if(c==-1)break; else{ - d.update(buf,0,c); + if(c>0)d.update(buf,0,c); } } return UDTStatistics.hexString(d); Modified: udt-java/trunk/src/test/java/udt/UDTTestBase.java =================================================================== --- udt-java/trunk/src/test/java/udt/UDTTestBase.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/test/java/udt/UDTTestBase.java 2010-04-25 20:32:22 UTC (rev 24) @@ -5,6 +5,8 @@ import java.security.MessageDigest; import java.util.Random; +import udt.util.UDTStatistics; + import junit.framework.TestCase; /** @@ -58,14 +60,7 @@ } public static String hexString(MessageDigest digest){ - byte[] messageDigest = digest.digest(); - StringBuilder hexString = new StringBuilder(); - for (int i=0;i<messageDigest.length;i++) { - String hex = Integer.toHexString(0xFF & messageDigest[i]); - if(hex.length()==1)hexString.append('0'); - hexString.append(hex); - } - return hexString.toString(); + return UDTStatistics.hexString(digest); } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-22 10:06:54 UTC (rev 23) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-25 20:32:22 UTC (rev 24) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=200; + int num_packets=100; //how large is a single packet int size=1*1024*1024; @@ -64,7 +64,12 @@ client.sendBlocking(data); digest.update(data); double took=System.currentTimeMillis()-block; - System.out.println("Sent block <"+i+"> in "+took+" ms, rate: "+format.format(size/(1024*took))+ " Mbytes/sec"); + double arrival=client.getStatistics().getPacketArrivalRate(); + double snd=client.getStatistics().getSendPeriod(); + System.out.println("Sent block <"+i+"> in "+took+" ms, " + +" pktArr: "+arrival + + " snd: "+format.format(snd) + +" rate: "+format.format(size/(1024*took))+ " MB/sec"); } end=System.currentTimeMillis(); client.shutdown(); @@ -74,7 +79,7 @@ while(serverRunning)Thread.sleep(100); System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); - double mbytes=N/(end-start)/1024; + double mbytes=N/(end-start)/1024.0; double mbit=8*mbytes; System.out.println("Rate: "+format.format(mbytes)+" Mbytes/sec "+format.format(mbit)+" Mbit/sec"); System.out.println("Server received: "+total); @@ -117,7 +122,6 @@ else{ md5.update(buf, 0, c); total+=c; - Thread.yield(); } } System.out.println("Server thread exiting, last received bytes: "+c); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |