[Udt-java-commits] SF.net SVN: udt-java:[28] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-28 14:52:04
|
Revision: 28 http://udt-java.svn.sourceforge.net/udt-java/?rev=28&view=rev Author: bschuller Date: 2010-04-28 14:51:57 +0000 (Wed, 28 Apr 2010) Log Message: ----------- some cleanup Modified Paths: -------------- udt-java/trunk/pom.xml udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/Destination.java udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java udt-java/trunk/src/main/java/udt/util/MeanValue.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/TestList.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Removed Paths: ------------- udt-java/trunk/src/main/java/udt/util/FlowWindow.java Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/pom.xml 2010-04-28 14:51:57 UTC (rev 28) @@ -5,7 +5,7 @@ <artifactId>udt-java</artifactId> <packaging>jar</packaging> <name>UDT Java implementation</name> - <version>0.2-SNAPSHOT</version> + <version>0.4-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> <developers> <developer> Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-28 14:51:57 UTC (rev 28) @@ -50,6 +50,7 @@ import udt.packets.ConnectionHandshake; import udt.packets.Destination; import udt.packets.PacketFactory; +import udt.util.MeanValue; import udt.util.UDTThreadFactory; /** @@ -82,7 +83,7 @@ private volatile boolean stopped=false; public static final int DATAGRAM_SIZE=1500; - + /** * bind to any local port on the given host address * @param localAddress @@ -113,6 +114,8 @@ sessionHandoff=new SynchronousQueue<UDTSession>(); //set a time out to avoid blocking in doReceive() dgSocket.setSoTimeout(1000); + //buffer size + dgSocket.setReceiveBufferSize(512*1024); } /** @@ -237,6 +240,7 @@ */ private long lastDestID=-1; private UDTSession lastSession; + MeanValue v=new MeanValue(true,64); protected void doReceive()throws IOException{ try{ try{ @@ -294,12 +298,11 @@ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } } - + protected void doSend(UDTPacket packet)throws IOException{ byte[]data=packet.getEncoded(); - Destination dest=packet.getSession().getDestination(); - DatagramPacket dgp = new DatagramPacket(data, data.length, - dest.getAddress() , dest.getPort()); + DatagramPacket dgp = packet.getSession().getDatagram(); + dgp.setData(data); dgSocket.send(dgp); } Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-28 14:51:57 UTC (rev 28) @@ -30,7 +30,7 @@ private long estimatedLinkCapacity=0; // Packet sending period = packet send interval, in microseconds - private double packetSendingPeriod=1; + private double packetSendingPeriod=0; // Congestion window size, in packets private long congestionWindowSize=16; @@ -228,7 +228,10 @@ // c. Record the current largest sent sequence number (LastDecSeq). lastDecreaseSeqNo= currentMaxSequenceNumber; } - + + //enforce upper limit on send period... + //packetSendingPeriod=Math.min(packetSendingPeriod, 2*roundTripTime); + statistics.setSendPeriod(packetSendingPeriod); return; } Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-28 14:51:57 UTC (rev 28) @@ -34,10 +34,10 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import udt.util.FlowWindow; import udt.util.UDTStatistics; /** @@ -54,7 +54,7 @@ //inbound application data, in-order, and ready for reading //by the application - private final FlowWindow<AppData>appData; + private final PriorityBlockingQueue<AppData>appData; private final UDTStatistics statistics; @@ -78,13 +78,9 @@ public UDTInputStream(UDTSocket socket, UDTStatistics statistics)throws IOException{ this.socket=socket; this.statistics=statistics; - appData=new FlowWindow<AppData>(getFlowWindowSize()); + appData=new PriorityBlockingQueue<AppData>(128); } - private int getFlowWindowSize(){ - if(socket!=null)return 2*socket.getSession().getFlowWindowSize(); - else return 128; - } /** * create a new {@link UDTInputStream} connected to the given socket * @param socket - the {@link UDTSocket} @@ -172,6 +168,7 @@ } } else currentChunk=appData.poll(10, TimeUnit.MILLISECONDS); + }catch(InterruptedException ie){ IOException ex=new IOException(); ex.initCause(ie); Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-28 14:51:57 UTC (rev 28) @@ -167,9 +167,7 @@ //incoming packets are ordered by sequence number, with control packets having //preference over data packets - handoffQueue=//new ArrayBlockingQueue<UDTPacket>(session.getFlowWindowSize()); - new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); - + handoffQueue=new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); start(); } @@ -198,8 +196,6 @@ /* * packets are written by the endpoint */ - long i=0; - long mean=0; protected void receive(UDTPacket p)throws IOException{ handoffQueue.offer(p); } @@ -247,6 +243,7 @@ } processUDTPacket(packet); } + Thread.yield(); } @@ -326,6 +323,7 @@ protected void processUDTPacket(UDTPacket p)throws IOException{ //(3).Check the packet type and process it according to this. + if(p instanceof DataPacket){ DataPacket dp=(DataPacket)p; onDataPacketReceived(dp); @@ -340,8 +338,6 @@ onShutdown(); } - //other packet types? - } //every nth packet will be discarded... for testing only of course @@ -375,11 +371,8 @@ //store current time lastDataPacketArrivalTime=currentDataPacketArrivalTime; - if(!session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData())){ - //no left space in application data buffer->drop this packet - return; - } - + session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); + //(6).number of detected lossed packet /*(6.a).if the number of the current data packet is greater than LSRN+1, put all the sequence numbers between (but excluding) these two values @@ -387,13 +380,11 @@ if(currentSequenceNumber>largestReceivedSeqNumber+1){ sendNAK(currentSequenceNumber); } - else{ - if(currentSequenceNumber<largestReceivedSeqNumber){ + else if(currentSequenceNumber<largestReceivedSeqNumber){ /*(6.b).if the sequence number is less than LRSN,remove it from * the receiver's loss list */ receiverLossList.remove(currentSequenceNumber); - } } statistics.incNumberOfReceivedDataPackets(); @@ -422,6 +413,7 @@ receiverLossList.insert(detectedLossSeqNumber); } endpoint.doSend(nAckPacket); + statistics.incNumberOfNAKSent(); } protected void sendNAK(List<Long>sequenceNumbers)throws IOException{ @@ -447,7 +439,7 @@ estimateLinkCapacity=packetPairWindow.getEstimatedLinkCapacity(); acknowledgmentPkt.setEstimatedLinkCapacity(estimateLinkCapacity); //set the packet arrival rate - packetArrivalSpeed=(long)packetHistoryWindow.getPacketArrivalSpeed(); + packetArrivalSpeed=packetHistoryWindow.getPacketArrivalSpeed(); acknowledgmentPkt.setPacketReceiveRate(packetArrivalSpeed); endpoint.doSend(acknowledgmentPkt); Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-28 14:51:57 UTC (rev 28) @@ -96,36 +96,46 @@ //last acknowledge number, initialised to the initial sequence number private long lastAckSequenceNumber; - //size of the send queue - public final int sendQueueLength; + private volatile boolean started=false; private volatile boolean stopped=false; - private volatile AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final CountDownLatch startLatch=new CountDownLatch(1); - private volatile AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + public UDTSender(UDTSession session,UDPEndPoint endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; - statistics=session.getStatistics(); - sendQueueLength=64;//session.getFlowWindowSize(); senderLossList=new SenderLossList(); - sendBuffer=new ConcurrentHashMap<Long, DataPacket>(sendQueueLength,0.75f,2); - sendQueue = new LinkedBlockingQueue<DataPacket>(sendQueueLength); + sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2); + sendQueue = new LinkedBlockingQueue<DataPacket>(1000); lastAckSequenceNumber=session.getInitialSequenceNumber(); waitForAckLatch.set(new CountDownLatch(1)); waitForSeqAckLatch.set(new CountDownLatch(1)); - start(); + doStart(); } + /** + * start the sender thread + */ + public void start(){ + logger.info("Starting sender for "+session); + startLatch.countDown(); + started=true; + } + //starts the sender algorithm - private void start(){ + private void doStart(){ Runnable r=new Runnable(){ public void run(){ try{ + //wait until explicitely started + startLatch.await(); while(!stopped){ senderAlgorithm(); } @@ -150,7 +160,7 @@ * @param data * @throws IOException * @throws InterruptedException - */ + */ private void send(DataPacket p)throws IOException{ synchronized(sendLock){ endpoint.doSend(p); @@ -161,15 +171,6 @@ } /** - * writes a data packet into the sendQueue - * @return <code>true</code>if the packet was added, <code>false</code> if the - * packet could not be added because the queue was full - */ - protected boolean sendUdtPacket(DataPacket p)throws IOException{ - return sendQueue.offer(p); - } - - /** * writes a data packet into the sendQueue, waiting at most for the specified time * if this is not possible due to a full send queue * @@ -183,6 +184,7 @@ * @throws InterruptedException */ protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{ + if(!started)start(); return sendQueue.offer(p,timeout,units); } @@ -244,7 +246,7 @@ */ protected void onNAKPacketReceived(NegativeAcknowledgement nak){ waitForAckLatch.get().countDown(); - + for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } @@ -252,12 +254,12 @@ session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); statistics.storeParameters(); - + if(logger.isLoggable(Level.FINER)){ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " - +"set send period to "+session.getCongestionControl().getSendInterval()); + +"set send period to "+session.getCongestionControl().getSendInterval()); } - + return; } @@ -282,56 +284,45 @@ */ public void senderAlgorithm()throws InterruptedException, IOException{ long iterationStart=Util.getCurrentTime(); + //if the sender's loss list is not empty SenderLossListEntry entry=senderLossList.getFirstEntry(); if (entry!=null) { - long seqNumber = entry.getSequenceNumber(); - //TODO - //if the current seqNumber is 16n,check the timeOut in the - //loss list and send a message drop request. - //if((seqNumber%16)==0){ - //sendLossList.checkTimeOut(timeToLive); - //} - try { - //retransmit the packet with the first entry in the list - //as sequence number and remove it from the list - DataPacket pktToRetransmit = sendBuffer.get(seqNumber); - if(pktToRetransmit!=null){ - endpoint.doSend(pktToRetransmit); - statistics.incNumberOfRetransmittedDataPackets(); - } - }catch (Exception e) { - logger.log(Level.WARNING,"",e); - } - // return; + handleResubmit(entry); } - - //if the number of unacknowledged data packets does not exceed the congestion - //and the flow window sizes, pack a new packet - int unAcknowledged=unacknowledged.get(); - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ - - if(sendQueue.size()==0){ - //Thread.yield(); + else + { + //if the number of unacknowledged data packets does not exceed the congestion + //and the flow window sizes, pack a new packet + int unAcknowledged=unacknowledged.get(); + + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()){ + //check for application data + DataPacket dp=sendQueue.poll();//10*Util.getSYNTime(),TimeUnit.MICROSECONDS); + if(dp!=null){ + send(dp); + largestSentSequenceNumber=dp.getPacketSequenceNumber(); + } + else { + Thread.yield(); + return; + } + }else{ + //congestion window full, should we *really* wait for an ack?! + if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ + statistics.incNumberOfCCWindowExceededEvents(); + } + Thread.sleep(1); + //waitForAck(); return; } - DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS); - if(dp!=null){ - send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); - } - }else{ - //should we *really* wait for an ack?! - if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ - statistics.incNumberOfCCWindowExceededEvents(); - } - Thread.sleep(1); - //waitForAck(); } //wait + + double snd=session.getCongestionControl().getSendInterval(); long passed=Util.getCurrentTime()-iterationStart; int x=0; @@ -339,17 +330,42 @@ if(x++==0)statistics.incNumberOfCCSlowDownEvents(); //we cannot wait with microsecond precision if(snd-passed>750)Thread.sleep(1); - else Thread.yield(); + else if((snd-passed)/snd > 0.9){ + return; + } passed=Util.getCurrentTime()-iterationStart; } - + } /** + * re-submits an entry from the sender loss list + * @param entry + */ + protected void handleResubmit(SenderLossListEntry entry){ + long seqNumber = entry.getSequenceNumber(); + //TODO + //if the current seqNumber is 16n,check the timeOut in the + //loss list and send a message drop request. + //if((seqNumber%16)==0){ + //sendLossList.checkTimeOut(timeToLive); + //} + try { + //retransmit the packet and remove it from the list + DataPacket pktToRetransmit = sendBuffer.get(seqNumber); + if(pktToRetransmit!=null){ + endpoint.doSend(pktToRetransmit); + statistics.incNumberOfRetransmittedDataPackets(); + } + }catch (Exception e) { + logger.log(Level.WARNING,"",e); + } + } + + /** * for processing EXP event (see spec. p 13) */ protected void putUnacknowledgedPacketsIntoLossList(){ - synchronized (sendLock) { for(Long l: sendBuffer.keySet()){ senderLossList.insert(new SenderLossListEntry(l)); @@ -391,7 +407,7 @@ return largestSentSequenceNumber>=sequenceNumber; } - + boolean haveLostPackets(){ return !senderLossList.isEmpty(); } Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-28 14:51:57 UTC (rev 28) @@ -32,6 +32,7 @@ package udt; +import java.net.DatagramPacket; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -64,11 +65,14 @@ protected final CongestionControl cc; + //cache dgPacket (peer stays the same always) + private DatagramPacket dgPacket; + /** * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=1024; + protected int flowWindowSize=64; /** * remote UDT entity (address and socket ID) @@ -105,7 +109,7 @@ statistics=new UDTStatistics(description); mySocketID=nextSocketID.incrementAndGet(); this.destination=destination; - + this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort()); //init configurable CC String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName()); Object ccObject=null; @@ -210,4 +214,7 @@ this.initialSequenceNumber=initialSequenceNumber; } + public DatagramPacket getDatagram(){ + return dgPacket; + } } Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-28 14:51:57 UTC (rev 28) @@ -148,7 +148,7 @@ */ protected void doWrite(byte[]data, int offset, int length)throws IOException{ try{ - doWrite(data, offset, length, 5, TimeUnit.MILLISECONDS); + doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ IOException io=new IOException(); io.initCause(ie); @@ -163,14 +163,13 @@ * @param length * @param timeout * @param units - * @throws IOException + * @throws IOException - if data cannot be sent * @throws InterruptedException */ protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{ int chunksize=session.getDatagramSize()-24;//need some bytes for the header ByteBuffer bb=ByteBuffer.wrap(data,offset,length); long seqNo=0; - int i=0; while(bb.remaining()>0){ int len=Math.min(bb.remaining(),chunksize); byte[]chunk=new byte[len]; @@ -182,10 +181,9 @@ packet.setDestinationID(session.getDestination().getSocketID()); packet.setData(chunk); //put the packet into the send queue - while(!sender.sendUdtPacket(packet, timeout, units)){ - Thread.sleep(1); + if(!sender.sendUdtPacket(packet, timeout, units)){ + throw new IOException("Queue full"); } - i++; } if(length>0)active=true; } Modified: udt-java/trunk/src/main/java/udt/packets/Destination.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-04-28 14:51:57 UTC (rev 28) @@ -33,7 +33,6 @@ package udt.packets; import java.net.InetAddress; -import java.net.UnknownHostException; public class Destination { @@ -49,7 +48,7 @@ this.port=port; } - public InetAddress getAddress()throws UnknownHostException{ + public InetAddress getAddress(){ return address; } Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2010-04-28 14:51:57 UTC (rev 28) @@ -34,13 +34,8 @@ import udt.util.CircularArray; - - /** * A circular array that records the packet arrival times - * - * - * */ public class PacketHistoryWindow extends CircularArray<Long>{ @@ -57,7 +52,7 @@ * (see specification section 6.2, page 12) * @return the current value */ - public double getPacketArrivalSpeed(){ + public long getPacketArrivalSpeed(){ if(!haveOverflow)return 0; int num=max-1; double AI; @@ -94,7 +89,7 @@ else{ medianPacketArrivalSpeed=0; } - return medianPacketArrivalSpeed; + return (long)Math.ceil(medianPacketArrivalSpeed); } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-28 14:51:57 UTC (rev 28) @@ -72,4 +72,8 @@ public long size(){ return backingList.size(); } + + public String toString(){ + return backingList.toString(); + } } Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-28 14:51:57 UTC (rev 28) @@ -95,6 +95,6 @@ } public String toString(){ - return "lossListEntry-"+sequenceNumber; + return "lost-"+sequenceNumber; } } Deleted: udt-java/trunk/src/main/java/udt/util/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-28 14:51:57 UTC (rev 28) @@ -1,87 +0,0 @@ -/********************************************************************************* - * Copyright (c) 2010 Forschungszentrum Juelich GmbH - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * (1) Redistributions of source code must retain the above copyright notice, - * this list of conditions and the disclaimer at the end. Redistributions in - * binary form must reproduce the above copyright notice, this list of - * conditions and the following disclaimer in the documentation and/or other - * materials provided with the distribution. - * - * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its - * contributors may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * DISCLAIMER - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - *********************************************************************************/ - -package udt.util; - -import java.util.concurrent.PriorityBlockingQueue; - -/** - * bounded queue - * - */ -public class FlowWindow<E> extends PriorityBlockingQueue<E> { - - private static final long serialVersionUID=1l; - - private volatile int capacity; - - /** - * create a new flow window with the given size - * - * @param size - the initial size of the flow window - */ - public FlowWindow(int size){ - super(); - this.capacity=size; - } - - /** - * create a new flow window with the default size of 16 - */ - public FlowWindow(){ - this(16); - } - - public void setCapacity(int newSize){ - capacity=newSize; - } - - public int getCapacity(){ - return capacity; - } - - /** - * try to add an element to the queue, return false if it is not possible - */ - @Override - public boolean offer(E e) { - if(contains(e)){ - return true; - } - if(size()<capacity){ - return super.offer(e); - }else return false; - } - - - -} Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-28 14:51:57 UTC (rev 28) @@ -13,14 +13,36 @@ private final NumberFormat format; + private final boolean verbose; + private final long nValue; + private long start; + + private String msg; + public MeanValue(){ + this(false, 64); + } + + public MeanValue(boolean verbose){ + this(verbose, 64); + } + + public MeanValue(boolean verbose, int nValue){ format=NumberFormat.getNumberInstance(); format.setMaximumFractionDigits(2); + this.verbose=verbose; + this.nValue=nValue; + begin(); } + public void addValue(double value){ mean=(mean*n+value)/(n+1); n++; + if(verbose && n % nValue == 1){ + if(msg!=null)System.out.print(msg+" "); + System.out.println(getFormattedMean()); + } } public double getMean(){ @@ -35,4 +57,16 @@ mean=0; n=0; } + + public void begin(){ + start=Util.getCurrentTime(); + } + + public void end(){ + addValue(Util.getCurrentTime()-start); + } + public void end(String msg){ + this.msg=msg; + addValue(Util.getCurrentTime()-start); + } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-28 14:51:57 UTC (rev 28) @@ -108,6 +108,9 @@ System.out.println("[ReceiveFile] Rate: "+(int)mbytes+" MBytes/sec. "+(int)mbit+" MBit/sec."); client.shutdown(); + + if(verbose)System.out.println(client.getStatistics()); + }finally{ fos.close(); } Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-28 14:51:57 UTC (rev 28) @@ -151,7 +151,7 @@ out.write(PacketUtil.encode(size)); long start=System.currentTimeMillis(); //and send the file - Util.copy(fis, out, size, true); + Util.copy(fis, out, size, false); long end=System.currentTimeMillis(); System.out.println(socket.getSession().getStatistics().toString()); double rate=1000.0*size/1024/1024/(end-start); Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-28 14:51:57 UTC (rev 28) @@ -121,7 +121,7 @@ * @throws IOException */ public static void copy(InputStream source, OutputStream target, long size, boolean flush)throws IOException{ - byte[]buf=new byte[1*1024*1024]; + byte[]buf=new byte[65536]; int c; long read=0; while(true){ @@ -149,5 +149,5 @@ p.setPort(clientPort); endpoint.sendRaw(p); } - + } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/TestList.java 2010-04-28 14:51:57 UTC (rev 28) @@ -12,7 +12,6 @@ import udt.sender.SenderLossList; import udt.sender.SenderLossListEntry; import udt.util.CircularArray; -import udt.util.FlowWindow; /* * tests for the various list and queue classes @@ -33,27 +32,16 @@ c.add(11); System.out.println(c); } - - public void testFlowWindow(){ - FlowWindow<Long>f=new FlowWindow<Long>(5); - for(int i=0;i<5;i++){ - System.out.println(i); - assertTrue(f.add(Long.valueOf(i))); - } - assertFalse(f.add(0l)); - f.setCapacity(6); - assertTrue(f.add(0l)); - } - + public void testPacketHistoryWindow(){ PacketHistoryWindow packetHistoryWindow = new PacketHistoryWindow(16); - - for(int i=0;i<17;i++){ - packetHistoryWindow.add(i*5000l); + long offset=1000000; + for(int i=0;i<28;i++){ + packetHistoryWindow.add(offset+i*5000l); } //packets arrive every 5 ms, so packet arrival rate is 200/sec - assertEquals(200.0,packetHistoryWindow.getPacketArrivalSpeed()); + assertEquals(200,packetHistoryWindow.getPacketArrivalSpeed()); } @@ -109,6 +97,9 @@ d1.setPacketSequenceNumber(1); DataPacket d2=new DataPacket(); d2.setPacketSequenceNumber(2); + DataPacket d3=new DataPacket(); + d3.setPacketSequenceNumber(3); + q.offer(d3); q.offer(d2); q.offer(d1); q.offer(control); @@ -116,16 +107,28 @@ UDTPacket p1=q.poll(); assertTrue(p1.isControlPacket()); - UDTPacket p2=q.poll(); - assertFalse(p2.isControlPacket()); + UDTPacket p=q.poll(); + assertFalse(p.isControlPacket()); //check ordering by sequence number - assertEquals(1,p2.getPacketSequenceNumber()); + assertEquals(1,p.getPacketSequenceNumber()); - UDTPacket p3=q.poll(); - assertFalse(p3.isControlPacket()); - assertEquals(2,p3.getPacketSequenceNumber()); + DataPacket d=new DataPacket(); + d.setPacketSequenceNumber(54); + q.offer(d); + p=q.poll(); + assertFalse(p.isControlPacket()); + assertEquals(2,p.getPacketSequenceNumber()); + p=q.poll(); + assertFalse(p.isControlPacket()); + assertEquals(3,p.getPacketSequenceNumber()); + + p=q.poll(); + assertFalse(p.isControlPacket()); + assertEquals(54,p.getPacketSequenceNumber()); + + } } Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-28 14:51:57 UTC (rev 28) @@ -17,7 +17,8 @@ Thread.sleep(500); }while(!serverStarted); - File f=new File("src/test/java/datafile"); + //File f=new File("src/test/java/datafile"); + File f=new File("/tmp/100MB"); File tmp=File.createTempFile("udtest-", null); Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2010-04-28 14:51:57 UTC (rev 28) @@ -29,7 +29,6 @@ //set an artificial loss rate public void testWithLoss()throws Exception{ UDTReceiver.dropRate=3; - UDTReceiver.connectionExpiryDisabled=true; TIMEOUT=Integer.MAX_VALUE; num_packets=512; //set log level @@ -40,7 +39,6 @@ //send even more data public void testLargeDataSet()throws Exception{ UDTReceiver.dropRate=0; - UDTReceiver.connectionExpiryDisabled=true; TIMEOUT=Integer.MAX_VALUE; num_packets=3*1024; //set log level Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-28 14:51:57 UTC (rev 28) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=50; //how large is a single packet int size=1*1024*1024; @@ -35,7 +35,6 @@ //System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName()); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - //UDTReceiver.connectionExpiryDisabled=true; doTest(); } @@ -56,12 +55,12 @@ MessageDigest digest=MessageDigest.getInstance("MD5"); while(!serverRunning)Thread.sleep(100); long start=System.currentTimeMillis(); - System.out.println("Sending <"+num_packets+"> packets of <"+size/1024/1024+"> Mbytes each"); + System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each"); long end=0; if(serverRunning){ for(int i=0;i<num_packets;i++){ long block=System.currentTimeMillis(); - client.sendBlocking(data); + client.send(data); digest.update(data); double took=System.currentTimeMillis()-block; double arrival=client.getStatistics().getPacketArrivalRate(); @@ -71,6 +70,7 @@ + " snd: "+format.format(snd) +" rate: "+format.format(size/(1024*took))+ " MB/sec"); } + client.flush(); end=System.currentTimeMillis(); client.shutdown(); }else throw new IllegalStateException(); Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-26 06:36:47 UTC (rev 27) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-28 14:51:57 UTC (rev 28) @@ -7,13 +7,14 @@ import junit.framework.TestCase; import udt.UDPEndPoint; +import udt.util.MeanValue; /** * send some data over a UDP connection and measure performance */ public class UDPTest extends TestCase { - final int num_packets=5*1000; + final int num_packets=10*1000; final int packetSize=UDPEndPoint.DATAGRAM_SIZE; public void test1()throws Exception{ @@ -30,9 +31,12 @@ dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); + MeanValue v=new MeanValue(); for(int i=0;i<num_packets;i++){ dp.setData(data); + v.begin(); s.send(dp); + v.end(); } System.out.println("Finished sending."); while(serverRunning)Thread.sleep(10); @@ -41,6 +45,7 @@ System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec"); System.out.println("Rate "+num_packets+" packets/sec"); + System.out.println("Mean send time "+v.getFormattedMean()+" microsec"); System.out.println("Server received: "+total); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |