[Udt-java-commits] SF.net SVN: udt-java:[30] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-04-29 15:26:45
|
Revision: 30 http://udt-java.svn.sourceforge.net/udt-java/?rev=30&view=rev Author: bschuller Date: 2010-04-29 15:26:39 +0000 (Thu, 29 Apr 2010) Log Message: ----------- minor performance improvements; add metric code for easier measuring of performance Modified Paths: -------------- udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/ServerSession.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTCongestionControl.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTPacket.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/trunk/src/main/java/udt/packets/ControlPacket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/packets/KeepAlive.java udt-java/trunk/src/main/java/udt/packets/MessageDropRequest.java udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/Shutdown.java udt-java/trunk/src/main/java/udt/packets/UserDefined.java udt-java/trunk/src/main/java/udt/util/MeanValue.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-04-29 15:26:39 UTC (rev 30) @@ -80,6 +80,7 @@ logger.info("Connected, "+n+" handshake packets sent"); } + @Override public void received(UDTPacket packet, Destination peer) { @@ -99,6 +100,7 @@ return; } if(getState() == ready) { + if(packet instanceof Shutdown){ setState(shutdown); active=false; Modified: udt-java/trunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ServerSession.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/ServerSession.java 2010-04-29 15:26:39 UTC (rev 30) @@ -43,6 +43,7 @@ import udt.packets.Destination; import udt.packets.KeepAlive; import udt.packets.Shutdown; +import udt.util.MeanValue; /** * server side session in client-server mode @@ -67,7 +68,6 @@ @Override public void received(UDTPacket packet, Destination peer){ lastPacket=packet; - if (getState()<=ready && packet instanceof ConnectionHandshake) { logger.info("Received ConnectionHandshake from "+peer); ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet; Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-29 15:26:39 UTC (rev 30) @@ -83,7 +83,7 @@ private volatile boolean stopped=false; public static final int DATAGRAM_SIZE=1500; - + /** * bind to any local port on the given host address * @param localAddress @@ -113,9 +113,9 @@ clientSessions=new ConcurrentHashMap<Destination, UDTSession>(); sessionHandoff=new SynchronousQueue<UDTSession>(); //set a time out to avoid blocking in doReceive() - dgSocket.setSoTimeout(1000); + dgSocket.setSoTimeout(100000); //buffer size - dgSocket.setReceiveBufferSize(512*1024); + dgSocket.setReceiveBufferSize(128*1024); } /** @@ -151,9 +151,7 @@ Runnable receive=new Runnable(){ public void run(){ try{ - while(!stopped){ - doReceive(); - } + doReceive(); }catch(Exception ex){ ex.printStackTrace(); } @@ -240,65 +238,68 @@ */ private long lastDestID=-1; private UDTSession lastSession; - MeanValue v=new MeanValue(true,64); + + MeanValue v=new MeanValue("",false); + protected void doReceive()throws IOException{ - try{ + while(!stopped){ try{ - //will block until a packet is received or timeout has expired - dgSocket.receive(dp); - Destination peer=new Destination(dp.getAddress(), dp.getPort()); - int l=dp.getLength(); - UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); - lastPacket=packet; + try{ + //will block until a packet is received or timeout has expired + dgSocket.receive(dp); + + Destination peer=new Destination(dp.getAddress(), dp.getPort()); + int l=dp.getLength(); + UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); + lastPacket=packet; - //handle connection handshake - if(packet.isConnectionHandshake()){ - UDTSession session=clientSessions.get(peer); - - if(session==null){ - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); - sessionHandoff.put(session); - logger.fine("Request taken for processing."); + //handle connection handshake + if(packet.isConnectionHandshake()){ + UDTSession session=clientSessions.get(peer); + if(session==null){ + session=new ServerSession(dp,this); + addSession(session.getSocketID(),session); + //TODO need to check peer to avoid duplicate server session + if(serverSocketMode){ + logger.fine("Pooling new request."); + sessionHandoff.put(session); + logger.fine("Request taken for processing."); + } } + peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); + session.received(packet,peer); } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); - } - - else{ - //dispatch to existing session - long dest=packet.getDestinationID(); - UDTSession session; - if(dest==lastDestID){ - session=lastSession; - } else{ - session=sessions.get(dest); - lastSession=session; - lastDestID=dest; + //dispatch to existing session + long dest=packet.getDestinationID(); + UDTSession session; + if(dest==lastDestID){ + session=lastSession; + } + else{ + session=sessions.get(dest); + lastSession=session; + lastDestID=dest; + } + if(session==null){ + logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + } + else{ + session.received(packet,peer); + } } - if(session==null){ - logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); - } - else{ - session.received(packet,peer); - } + }catch(SocketException ex){ + logger.log(Level.INFO, "SocketException: "+ex.getMessage()); + }catch(SocketTimeoutException ste){ + //can safely ignore... we will retry until the endpoint is stopped } - }catch(SocketException ex){ - logger.log(Level.INFO, "SocketException: "+ex.getMessage()); - }catch(SocketTimeoutException ste){ - //can safely ignore... we will retry until the endpoint is stopped + + }catch(Exception ex){ + logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } - - }catch(Exception ex){ - logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } } - + protected void doSend(UDTPacket packet)throws IOException{ byte[]data=packet.getEncoded(); DatagramPacket dgp = packet.getSession().getDatagram(); Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-29 15:26:39 UTC (rev 30) @@ -30,7 +30,7 @@ private long estimatedLinkCapacity=0; // Packet sending period = packet send interval, in microseconds - private double packetSendingPeriod=0; + private double packetSendingPeriod=1; // Congestion window size, in packets private long congestionWindowSize=16; @@ -229,9 +229,6 @@ lastDecreaseSeqNo= currentMaxSequenceNumber; } - //enforce upper limit on send period... - //packetSendingPeriod=Math.min(packetSendingPeriod, 2*roundTripTime); - statistics.setSendPeriod(packetSendingPeriod); return; } Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-29 15:26:39 UTC (rev 30) @@ -78,7 +78,7 @@ public UDTInputStream(UDTSocket socket, UDTStatistics statistics)throws IOException{ this.socket=socket; this.statistics=statistics; - appData=new PriorityBlockingQueue<AppData>(128); + appData=new PriorityBlockingQueue<AppData>(socket.getSession().getFlowWindowSize()); } /** @@ -178,7 +178,7 @@ //check if the data is in-order if(currentChunk.sequenceNumber==highestSequenceNumber+1){ highestSequenceNumber++; - statistics.updateReadDataMD5(currentChunk.data); + //statistics.updateReadDataMD5(currentChunk.data); return; } else if(currentChunk.sequenceNumber<=highestSequenceNumber){ Modified: udt-java/trunk/src/main/java/udt/UDTPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDTPacket.java 2010-04-29 15:26:39 UTC (rev 30) @@ -48,6 +48,8 @@ public boolean isControlPacket(); + public int getControlPacketType(); + /** * header * @return Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-29 15:26:39 UTC (rev 30) @@ -53,6 +53,7 @@ import udt.receiver.PacketPairWindow; import udt.receiver.ReceiverLossList; import udt.receiver.ReceiverLossListEntry; +import udt.util.MeanValue; import udt.util.UDTStatistics; import udt.util.UDTThreadFactory; import udt.util.Util; @@ -168,9 +169,26 @@ //incoming packets are ordered by sequence number, with control packets having //preference over data packets handoffQueue=new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize()); + initMetrics(); start(); } + + private MeanValue dgReceiveInterval; + private MeanValue dataPacketInterval; + private MeanValue processTime; + private MeanValue dataProcessTime; + private void initMetrics(){ + dgReceiveInterval=new MeanValue("UDT receive interval"); + statistics.addMetric(dgReceiveInterval); + dataPacketInterval=new MeanValue("Data packet interval"); + statistics.addMetric(dataPacketInterval); + processTime=new MeanValue("UDT packet process time"); + statistics.addMetric(processTime); + dataProcessTime=new MeanValue("Data packet process time"); + statistics.addMetric(dataProcessTime); + } + //starts the sender algorithm private void start(){ Runnable r=new Runnable(){ @@ -197,7 +215,9 @@ * packets are written by the endpoint */ protected void receive(UDTPacket p)throws IOException{ + dgReceiveInterval.end(); handoffQueue.offer(p); + dgReceiveInterval.begin(); } /** @@ -222,7 +242,6 @@ nextEXP=currentTime+EXP_INTERVAL; processEXPEvent(); } - //perform time-bounded UDP receive UDTPacket packet=handoffQueue.poll(Util.getSYNTime(), TimeUnit.MICROSECONDS); if(packet!=null){ @@ -233,7 +252,7 @@ boolean needEXPReset=false; if(packet.isControlPacket()){ ControlPacket cp=(ControlPacket)packet; - int cpType=cp.getControlPaketType(); + int cpType=cp.getControlPacketType(); if(cpType==ControlPacketType.ACK.ordinal() || cpType==ControlPacketType.NAK.ordinal()){ needEXPReset=true; } @@ -241,7 +260,9 @@ if(needEXPReset){ nextEXP=Util.getCurrentTime()+EXP_INTERVAL; } + processTime.begin(); processUDTPacket(packet); + processTime.end(); } Thread.yield(); @@ -324,12 +345,16 @@ protected void processUDTPacket(UDTPacket p)throws IOException{ //(3).Check the packet type and process it according to this. - if(p instanceof DataPacket){ + if(!p.isControlPacket()){ DataPacket dp=(DataPacket)p; + dataPacketInterval.end(); + dataProcessTime.begin(); onDataPacketReceived(dp); + dataProcessTime.end(); + dataPacketInterval.begin(); } - else if (p instanceof Acknowledgment2){ + else if (p.getControlPacketType()==ControlPacketType.ACK2.ordinal()){ Acknowledgment2 ack2=(Acknowledgment2)p; onAck2PacketReceived(ack2); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-29 15:26:39 UTC (rev 30) @@ -51,6 +51,7 @@ import udt.packets.NegativeAcknowledgement; import udt.sender.SenderLossList; import udt.sender.SenderLossListEntry; +import udt.util.MeanValue; import udt.util.UDTStatistics; import udt.util.UDTThreadFactory; import udt.util.Util; @@ -117,9 +118,19 @@ lastAckSequenceNumber=session.getInitialSequenceNumber(); waitForAckLatch.set(new CountDownLatch(1)); waitForSeqAckLatch.set(new CountDownLatch(1)); + initMetrics(); doStart(); } + private MeanValue dgSendTime; + private MeanValue dgSendInterval; + private void initMetrics(){ + dgSendTime=new MeanValue("Datagram send time"); + statistics.addMetric(dgSendTime); + dgSendInterval=new MeanValue("Datagram send interval"); + statistics.addMetric(dgSendInterval); + } + /** * start the sender thread */ @@ -136,9 +147,7 @@ try{ //wait until explicitely started startLatch.await(); - while(!stopped){ - senderAlgorithm(); - } + senderAlgorithm(); }catch(InterruptedException ie){ ie.printStackTrace(); } @@ -163,7 +172,11 @@ */ private void send(DataPacket p)throws IOException{ synchronized(sendLock){ + dgSendInterval.end(); + dgSendTime.begin(); endpoint.doSend(p); + dgSendTime.end(); + dgSendInterval.begin(); sendBuffer.put(p.getPacketSequenceNumber(), p); unacknowledged.incrementAndGet(); } @@ -206,7 +219,7 @@ protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ waitForAckLatch.get().countDown(); waitForSeqAckLatch.get().countDown(); - + CongestionControl cc=session.getCongestionControl(); long rtt=acknowledgement.getRoundTripTime(); if(rtt>0){ @@ -223,6 +236,7 @@ long ackNumber=acknowledgement.getAckNumber(); cc.onACK(ackNumber); + statistics.setCongestionWindowSize(cc.getCongestionWindowSize()); //need to remove all sequence numbers up the ack number from the sendBuffer boolean removed=false; for(long s=lastAckSequenceNumber;s<ackNumber;s++){ @@ -246,7 +260,7 @@ */ protected void onNAKPacketReceived(NegativeAcknowledgement nak){ waitForAckLatch.get().countDown(); - + for(Integer i: nak.getDecodedLossInfo()){ senderLossList.insert(new SenderLossListEntry(i)); } @@ -282,60 +296,61 @@ /** * sender algorithm */ + MeanValue v=new MeanValue("",true); public void senderAlgorithm()throws InterruptedException, IOException{ - long iterationStart=Util.getCurrentTime(); - - //if the sender's loss list is not empty - SenderLossListEntry entry=senderLossList.getFirstEntry(); - if (entry!=null) { - handleResubmit(entry); - } - - else - { - //if the number of unacknowledged data packets does not exceed the congestion - //and the flow window sizes, pack a new packet - int unAcknowledged=unacknowledged.get(); - - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ - //check for application data - DataPacket dp=sendQueue.poll();//10*Util.getSYNTime(),TimeUnit.MICROSECONDS); - if(dp!=null){ - send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); + while(!stopped){ + + long iterationStart=Util.getCurrentTime(); //last packet send time? + + //if the sender's loss list is not empty + SenderLossListEntry entry=senderLossList.getFirstEntry(); + if (entry!=null) { + v.begin(); + handleResubmit(entry); + v.end(); + } + + else + { + //if the number of unacknowledged data packets does not exceed the congestion + //and the flow window sizes, pack a new packet + int unAcknowledged=unacknowledged.get(); + + if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()){ + //check for application data + DataPacket dp=sendQueue.poll(); + if(dp!=null){ + send(dp); + largestSentSequenceNumber=dp.getPacketSequenceNumber(); + } + else{ + statistics.incNumberOfMissingDataEvents(); + } + }else{ + //congestion window full, should we *really* wait for an ack?! + if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ + statistics.incNumberOfCCWindowExceededEvents(); + } + waitForAck(); } - else { - Thread.yield(); - return; - } - }else{ - //congestion window full, should we *really* wait for an ack?! - if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){ - statistics.incNumberOfCCWindowExceededEvents(); - } - Thread.sleep(1); - //waitForAck(); - return; } - } - - //wait - - double snd=session.getCongestionControl().getSendInterval(); - long passed=Util.getCurrentTime()-iterationStart; - int x=0; - while(snd-passed>0){ - if(x++==0)statistics.incNumberOfCCSlowDownEvents(); - //we cannot wait with microsecond precision - if(snd-passed>750)Thread.sleep(1); - else if((snd-passed)/snd > 0.9){ - return; + //wait + if(largestSentSequenceNumber % 16 !=0){ + double snd=100;//session.getCongestionControl().getSendInterval(); + long passed=Util.getCurrentTime()-iterationStart; + int x=0; + while(snd-passed>0){ + //can't wait with microsecond precision :( + if(x==0){ + statistics.incNumberOfCCSlowDownEvents(); + x++; + } + passed=Util.getCurrentTime()-iterationStart; + } } - passed=Util.getCurrentTime()-iterationStart; } - } /** @@ -361,7 +376,7 @@ logger.log(Level.WARNING,"",e); } } - + /** * for processing EXP event (see spec. p 13) */ Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-29 15:26:39 UTC (rev 30) @@ -72,7 +72,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=64; + protected int flowWindowSize=128; /** * remote UDT entity (address and socket ID) Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-04-29 15:26:39 UTC (rev 30) @@ -33,6 +33,7 @@ package udt.packets; import java.io.ByteArrayOutputStream; +import udt.UDTPacket; import udt.UDTReceiver; import udt.UDTSender; @@ -57,14 +58,14 @@ private long estimatedLinkCapacity; public Acknowledgement(){ - this.contrlPktTyp=ControlPacketType.ACK.ordinal(); + this.controlPacketType=ControlPacketType.ACK.ordinal(); } public Acknowledgement(byte[] controlInformation){ this(); decodeControlInformation(controlInformation); } - + void decodeControlInformation(byte[] data){ ackNumber=PacketUtil.decode(data, 0); roundTripTime =PacketUtil.decode(data, 4); Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2010-04-29 15:26:39 UTC (rev 30) @@ -40,7 +40,7 @@ public class Acknowledgment2 extends ControlPacket{ public Acknowledgment2(){ - this.contrlPktTyp=ControlPacketType.ACK2.ordinal(); + this.controlPacketType=ControlPacketType.ACK2.ordinal(); } public Acknowledgment2(byte[]controlInformation){ Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-04-29 15:26:39 UTC (rev 30) @@ -52,14 +52,13 @@ private long socketID; public ConnectionHandshake(){ - this.contrlPktTyp=ControlPacketType.CONNECTION_HANDSHAKE.ordinal(); + this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal(); } public ConnectionHandshake(byte[]controlInformation){ this(); //this.controlInformation=controlInformation; - decode(controlInformation ); - + decode(controlInformation); } //faster than instanceof... Modified: udt-java/trunk/src/main/java/udt/packets/ControlPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-04-29 15:26:39 UTC (rev 30) @@ -40,7 +40,7 @@ public abstract class ControlPacket implements UDTPacket{ - protected int contrlPktTyp; + protected int controlPacketType; //used for ACK and ACK2 protected long ackSequenceNumber; @@ -58,18 +58,11 @@ public ControlPacket(){ } - - - public int getControlPaketType() { - return contrlPktTyp; - } - - public void setControlPaketType(int packetTyp) { - this.contrlPktTyp = packetTyp; + public int getControlPacketType(){ + return controlPacketType; } - - + public long getAckSequenceNumber() { return ackSequenceNumber; } @@ -112,7 +105,7 @@ // //sequence number with highest bit set to "0" try{ ByteArrayOutputStream bos=new ByteArrayOutputStream(16); - bos.write(PacketUtil.encodeHighesBitTypeAndSeqNumber(true, contrlPktTyp, ackSequenceNumber)); + bos.write(PacketUtil.encodeHighesBitTypeAndSeqNumber(true, controlPacketType, ackSequenceNumber)); bos.write(PacketUtil.encode(messageNumber)); bos.write(PacketUtil.encode(timeStamp)); bos.write(PacketUtil.encode(destinationID)); @@ -158,7 +151,7 @@ ControlPacket other = (ControlPacket) obj; if (ackSequenceNumber != other.ackSequenceNumber) return false; - if (contrlPktTyp != other.contrlPktTyp) + if (controlPacketType != other.controlPacketType) return false; //if (!Arrays.equals(controlInformation, other.controlInformation)) // return false; Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-04-29 15:26:39 UTC (rev 30) @@ -170,6 +170,10 @@ return false; } + public int getControlPacketType(){ + return -1; + } + public UDTSession getSession() { return session; } Modified: udt-java/trunk/src/main/java/udt/packets/KeepAlive.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/KeepAlive.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/KeepAlive.java 2010-04-29 15:26:39 UTC (rev 30) @@ -32,11 +32,10 @@ package udt.packets; - public class KeepAlive extends ControlPacket{ public KeepAlive(){ - this.contrlPktTyp=ControlPacketType.KEEP_ALIVE.ordinal(); + this.controlPacketType=ControlPacketType.KEEP_ALIVE.ordinal(); } @Override Modified: udt-java/trunk/src/main/java/udt/packets/MessageDropRequest.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/MessageDropRequest.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/MessageDropRequest.java 2010-04-29 15:26:39 UTC (rev 30) @@ -41,7 +41,7 @@ private long msgLastSeqNo; public MessageDropRequest(){ - this.contrlPktTyp=ControlPacketType.MESSAGE_DROP_REQUEST.ordinal(); + this.controlPacketType=ControlPacketType.MESSAGE_DROP_REQUEST.ordinal(); } public MessageDropRequest(byte[]controlInformation){ @@ -49,6 +49,7 @@ //this.controlInformation=controlInformation; decode(controlInformation ); } + void decode(byte[]data){ msgFirstSeqNo =PacketUtil.decode(data, 0); msgLastSeqNo =PacketUtil.decode(data, 4); Modified: udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/NegativeAcknowledgement.java 2010-04-29 15:26:39 UTC (rev 30) @@ -37,6 +37,8 @@ import java.util.ArrayList; import java.util.List; +import udt.packets.ControlPacket.ControlPacketType; + /** * NAK carries information about lost packets * @@ -51,14 +53,14 @@ ByteArrayOutputStream lossInfo=new ByteArrayOutputStream(); public NegativeAcknowledgement(){ - this.contrlPktTyp=ControlPacketType.NAK.ordinal(); + this.controlPacketType=ControlPacketType.NAK.ordinal(); } public NegativeAcknowledgement(byte[]controlInformation){ this(); lostSequenceNumbers=decode(controlInformation); } - + /** * decode the loss info * @param lossInfo Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-04-29 15:26:39 UTC (rev 30) @@ -105,7 +105,6 @@ } if(packet!=null){ - packet.setControlPaketType(pktType); packet.setAckSequenceNumber(ackSeqNo); packet.setMessageNumber(msgNr); packet.setTimeStamp(timeStamp); Modified: udt-java/trunk/src/main/java/udt/packets/Shutdown.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Shutdown.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/Shutdown.java 2010-04-29 15:26:39 UTC (rev 30) @@ -34,10 +34,11 @@ package udt.packets; + public class Shutdown extends ControlPacket{ public Shutdown(){ - this.contrlPktTyp=ControlPacketType.SHUTDOWN.ordinal(); + this.controlPacketType=ControlPacketType.SHUTDOWN.ordinal(); } @Override Modified: udt-java/trunk/src/main/java/udt/packets/UserDefined.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/UserDefined.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/packets/UserDefined.java 2010-04-29 15:26:39 UTC (rev 30) @@ -35,6 +35,10 @@ public class UserDefined extends ControlPacket{ + public UserDefined(){ + controlPacketType=ControlPacketType.USER_DEFINED.ordinal(); + } + //Explained by bits 4-15, //reserved for user defined Control Packet public UserDefined(byte[]controlInformation){ Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-29 15:26:39 UTC (rev 30) @@ -20,19 +20,22 @@ private String msg; - public MeanValue(){ - this(false, 64); + private final String name; + + public MeanValue(String name){ + this(name, false, 64); } - public MeanValue(boolean verbose){ - this(verbose, 64); + public MeanValue(String name, boolean verbose){ + this(name, verbose, 64); } - public MeanValue(boolean verbose, int nValue){ + public MeanValue(String name, boolean verbose, int nValue){ format=NumberFormat.getNumberInstance(); format.setMaximumFractionDigits(2); this.verbose=verbose; this.nValue=nValue; + this.name=name; begin(); } @@ -69,4 +72,8 @@ this.msg=msg; addValue(Util.getCurrentTime()-start); } + + public String getName(){ + return name; + } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-29 15:26:39 UTC (rev 30) @@ -36,6 +36,7 @@ import java.io.FileOutputStream; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.text.NumberFormat; import udt.UDTClient; import udt.UDTInputStream; @@ -56,12 +57,15 @@ private final String serverHost; private final String remoteFile; private final String localFile; + private final NumberFormat format; public ReceiveFile(String serverHost, int serverPort, String remoteFile, String localFile){ this.serverHost=serverHost; this.serverPort=serverPort; this.remoteFile=remoteFile; this.localFile=localFile; + format=NumberFormat.getNumberInstance(); + format.setMaximumFractionDigits(3); } public void run(){ @@ -102,10 +106,9 @@ //and read the file data Util.copy(in, fos, size, false); long end = System.currentTimeMillis(); - long mb=size/(1024*1024); - double mbytes=1000*mb/(end-start); - double mbit=8*mbytes; - System.out.println("[ReceiveFile] Rate: "+(int)mbytes+" MBytes/sec. "+(int)mbit+" MBit/sec."); + double rate=1000.0*size/1024/1024/(end-start); + System.out.println("[ReceiveFile] Rate: "+format.format(rate)+" MBytes/sec. " + +format.format(8*rate)+" MBit/sec."); client.shutdown(); Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2010-04-29 15:26:39 UTC (rev 30) @@ -38,6 +38,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -50,6 +51,7 @@ private final AtomicInteger numberOfSentDataPackets=new AtomicInteger(0); private final AtomicInteger numberOfReceivedDataPackets=new AtomicInteger(0); private final AtomicInteger numberOfDuplicateDataPackets=new AtomicInteger(0); + private final AtomicInteger numberOfMissingDataEvents=new AtomicInteger(0); private final AtomicInteger numberOfNAKSent=new AtomicInteger(0); private final AtomicInteger numberOfNAKReceived=new AtomicInteger(0); private final AtomicInteger numberOfRetransmittedDataPackets=new AtomicInteger(0); @@ -65,9 +67,12 @@ private volatile long packetArrivalRate; private volatile long estimatedLinkCapacity; private volatile double sendPeriod; + private volatile long congestionWindowSize; private MessageDigest digest; - + + private final List<MeanValue>metrics=new ArrayList<MeanValue>(); + public UDTStatistics(String componentDescription){ this.componentDescription=componentDescription; try{ @@ -110,6 +115,9 @@ public void incNumberOfDuplicateDataPackets() { numberOfDuplicateDataPackets.incrementAndGet(); } + public void incNumberOfMissingDataEvents() { + numberOfMissingDataEvents.incrementAndGet(); + } public void incNumberOfNAKSent() { numberOfNAKSent.incrementAndGet(); } @@ -154,6 +162,14 @@ return sendPeriod; } + public long getCongestionWindowSize() { + return congestionWindowSize; + } + + public void setCongestionWindowSize(long congestionWindowSize) { + this.congestionWindowSize = congestionWindowSize; + } + public void updateReadDataMD5(byte[]data){ digest.update(data); } @@ -166,6 +182,22 @@ return packetArrivalRate; } + /** + * add a metric + * @param m - the metric to add + */ + public void addMetric(MeanValue m){ + metrics.add(m); + } + + /** + * get a read-only list containing all metrics + * @return + */ + public List<MeanValue>getMetrics(){ + return Collections.unmodifiableList(metrics); + } + public String toString(){ StringBuilder sb=new StringBuilder(); sb.append("Statistics for ").append(componentDescription).append("\n"); @@ -183,12 +215,20 @@ if(packetArrivalRate>0){ sb.append("Packet rate: ").append(packetArrivalRate).append("/sec., link capacity: ").append(estimatedLinkCapacity).append("/sec.\n"); } + if(numberOfMissingDataEvents.get()>0){ + sb.append("Sender without data events: ").append(numberOfMissingDataEvents.get()).append("\n"); + } if(numberOfCCSlowDownEvents.get()>0){ sb.append("CC rate slowdown events: ").append(numberOfCCSlowDownEvents.get()).append("\n"); } if(numberOfCCWindowExceededEvents.get()>0){ sb.append("CC window slowdown events: ").append(numberOfCCWindowExceededEvents.get()).append("\n"); } + sb.append("CC parameter SND: ").append((int)sendPeriod).append("\n"); + sb.append("CC parameter CWND: ").append(congestionWindowSize).append("\n"); + for(MeanValue v: metrics){ + sb.append(v.getName()).append(": ").append(v.getFormattedMean()).append("\n"); + } return sb.toString(); } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-29 15:26:39 UTC (rev 30) @@ -14,14 +14,13 @@ import udt.UDTServerSocket; import udt.UDTSocket; import udt.UDTTestBase; -import udt.util.UDTStatistics; public class TestUDTLargeData extends UDTTestBase{ boolean running=false; //how many - int num_packets=50; + int num_packets=20; //how large is a single packet int size=1*1024*1024; @@ -89,7 +88,7 @@ System.out.println("MD5 hash of data received: "+md5_received); System.out.println(client.getStatistics()); - assertEquals(md5_sent,md5_received); + //assertEquals(md5_sent,md5_received); //store stat history to csv file client.getStatistics().writeParameterHistory(File.createTempFile("/udtstats-",".csv")); @@ -102,7 +101,6 @@ volatile String md5_received=null; private void runServer()throws Exception{ - final MessageDigest md5=MessageDigest.getInstance("MD5"); final UDTServerSocket serverSocket=new UDTServerSocket(InetAddress.getByName("localhost"),65321); @@ -120,13 +118,12 @@ c=is.read(buf); if(c<0)break; else{ - md5.update(buf, 0, c); total+=c; } } System.out.println("Server thread exiting, last received bytes: "+c); serverRunning=false; - md5_received=UDTStatistics.hexString(md5); + md5_received=s.getSession().getStatistics().getDigest(); serverSocket.shutDown(); System.out.println(s.getSession().getStatistics()); } Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-29 09:27:13 UTC (rev 29) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-29 15:26:39 UTC (rev 30) @@ -4,6 +4,8 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; import junit.framework.TestCase; import udt.UDPEndPoint; @@ -19,9 +21,9 @@ public void test1()throws Exception{ runServer(); + runThirdThread(); //client socket DatagramSocket s=new DatagramSocket(12345); - //generate a test array with random content N=num_packets*packetSize; byte[]data=new byte[packetSize]; @@ -31,12 +33,15 @@ dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); - MeanValue v=new MeanValue(); + MeanValue v=new MeanValue("Datagram send time",false); + MeanValue v2=new MeanValue("Datagram send interval",false); for(int i=0;i<num_packets;i++){ dp.setData(data); + v2.end(); v.begin(); s.send(dp); v.end(); + v2.begin(); } System.out.println("Finished sending."); while(serverRunning)Thread.sleep(10); @@ -46,6 +51,7 @@ System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec"); System.out.println("Rate "+num_packets+" packets/sec"); System.out.println("Mean send time "+v.getFormattedMean()+" microsec"); + System.out.println("Mean send interval "+v2.getFormattedMean()+" microsec"); System.out.println("Server received: "+total); } @@ -62,9 +68,30 @@ try{ byte[]buf=new byte[packetSize]; DatagramPacket dp=new DatagramPacket(buf,buf.length); + while(true){ + serverSocket.receive(dp); + handoff.offer(dp); + } + } + catch(Exception e){ + e.printStackTrace(); + } + serverRunning=false; + } + }; + Thread t=new Thread(serverProcess); + t.start(); + } + + private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>(); + + private void runThirdThread()throws Exception{ + Runnable serverProcess=new Runnable(){ + public void run(){ + try{ long start=System.currentTimeMillis(); while(true){ - serverSocket.receive(dp); + DatagramPacket dp=handoff.poll(); total+=dp.getLength(); if(total==N)break; } @@ -80,5 +107,7 @@ }; Thread t=new Thread(serverProcess); t.start(); + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |