[Udt-java-commits] SF.net SVN: udt-java:[28] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-04-28 14:52:04
|
Revision: 28
http://udt-java.svn.sourceforge.net/udt-java/?rev=28&view=rev
Author: bschuller
Date: 2010-04-28 14:51:57 +0000 (Wed, 28 Apr 2010)
Log Message:
-----------
some cleanup
Modified Paths:
--------------
udt-java/trunk/pom.xml
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/UDTSocket.java
udt-java/trunk/src/main/java/udt/packets/Destination.java
udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
udt-java/trunk/src/main/java/udt/util/MeanValue.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/SendFile.java
udt-java/trunk/src/main/java/udt/util/Util.java
udt-java/trunk/src/test/java/udt/TestList.java
udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
Removed Paths:
-------------
udt-java/trunk/src/main/java/udt/util/FlowWindow.java
Modified: udt-java/trunk/pom.xml
===================================================================
--- udt-java/trunk/pom.xml 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/pom.xml 2010-04-28 14:51:57 UTC (rev 28)
@@ -5,7 +5,7 @@
<artifactId>udt-java</artifactId>
<packaging>jar</packaging>
<name>UDT Java implementation</name>
- <version>0.2-SNAPSHOT</version>
+ <version>0.4-SNAPSHOT</version>
<url>http://sourceforge.net/projects/udt-java</url>
<developers>
<developer>
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -50,6 +50,7 @@
import udt.packets.ConnectionHandshake;
import udt.packets.Destination;
import udt.packets.PacketFactory;
+import udt.util.MeanValue;
import udt.util.UDTThreadFactory;
/**
@@ -82,7 +83,7 @@
private volatile boolean stopped=false;
public static final int DATAGRAM_SIZE=1500;
-
+
/**
* bind to any local port on the given host address
* @param localAddress
@@ -113,6 +114,8 @@
sessionHandoff=new SynchronousQueue<UDTSession>();
//set a time out to avoid blocking in doReceive()
dgSocket.setSoTimeout(1000);
+ //buffer size
+ dgSocket.setReceiveBufferSize(512*1024);
}
/**
@@ -237,6 +240,7 @@
*/
private long lastDestID=-1;
private UDTSession lastSession;
+ MeanValue v=new MeanValue(true,64);
protected void doReceive()throws IOException{
try{
try{
@@ -294,12 +298,11 @@
logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
}
}
-
+
protected void doSend(UDTPacket packet)throws IOException{
byte[]data=packet.getEncoded();
- Destination dest=packet.getSession().getDestination();
- DatagramPacket dgp = new DatagramPacket(data, data.length,
- dest.getAddress() , dest.getPort());
+ DatagramPacket dgp = packet.getSession().getDatagram();
+ dgp.setData(data);
dgSocket.send(dgp);
}
Modified: udt-java/trunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDTCongestionControl.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -30,7 +30,7 @@
private long estimatedLinkCapacity=0;
// Packet sending period = packet send interval, in microseconds
- private double packetSendingPeriod=1;
+ private double packetSendingPeriod=0;
// Congestion window size, in packets
private long congestionWindowSize=16;
@@ -228,7 +228,10 @@
// c. Record the current largest sent sequence number (LastDecSeq).
lastDecreaseSeqNo= currentMaxSequenceNumber;
}
-
+
+ //enforce upper limit on send period...
+ //packetSendingPeriod=Math.min(packetSendingPeriod, 2*roundTripTime);
+
statistics.setSendPeriod(packetSendingPeriod);
return;
}
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -34,10 +34,10 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import udt.util.FlowWindow;
import udt.util.UDTStatistics;
/**
@@ -54,7 +54,7 @@
//inbound application data, in-order, and ready for reading
//by the application
- private final FlowWindow<AppData>appData;
+ private final PriorityBlockingQueue<AppData>appData;
private final UDTStatistics statistics;
@@ -78,13 +78,9 @@
public UDTInputStream(UDTSocket socket, UDTStatistics statistics)throws IOException{
this.socket=socket;
this.statistics=statistics;
- appData=new FlowWindow<AppData>(getFlowWindowSize());
+ appData=new PriorityBlockingQueue<AppData>(128);
}
- private int getFlowWindowSize(){
- if(socket!=null)return 2*socket.getSession().getFlowWindowSize();
- else return 128;
- }
/**
* create a new {@link UDTInputStream} connected to the given socket
* @param socket - the {@link UDTSocket}
@@ -172,6 +168,7 @@
}
}
else currentChunk=appData.poll(10, TimeUnit.MILLISECONDS);
+
}catch(InterruptedException ie){
IOException ex=new IOException();
ex.initCause(ie);
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -167,9 +167,7 @@
//incoming packets are ordered by sequence number, with control packets having
//preference over data packets
- handoffQueue=//new ArrayBlockingQueue<UDTPacket>(session.getFlowWindowSize());
- new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize());
-
+ handoffQueue=new PriorityBlockingQueue<UDTPacket>(session.getFlowWindowSize());
start();
}
@@ -198,8 +196,6 @@
/*
* packets are written by the endpoint
*/
- long i=0;
- long mean=0;
protected void receive(UDTPacket p)throws IOException{
handoffQueue.offer(p);
}
@@ -247,6 +243,7 @@
}
processUDTPacket(packet);
}
+
Thread.yield();
}
@@ -326,6 +323,7 @@
protected void processUDTPacket(UDTPacket p)throws IOException{
//(3).Check the packet type and process it according to this.
+
if(p instanceof DataPacket){
DataPacket dp=(DataPacket)p;
onDataPacketReceived(dp);
@@ -340,8 +338,6 @@
onShutdown();
}
- //other packet types?
-
}
//every nth packet will be discarded... for testing only of course
@@ -375,11 +371,8 @@
//store current time
lastDataPacketArrivalTime=currentDataPacketArrivalTime;
- if(!session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData())){
- //no left space in application data buffer->drop this packet
- return;
- }
-
+ session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
+
//(6).number of detected lossed packet
/*(6.a).if the number of the current data packet is greater than LSRN+1,
put all the sequence numbers between (but excluding) these two values
@@ -387,13 +380,11 @@
if(currentSequenceNumber>largestReceivedSeqNumber+1){
sendNAK(currentSequenceNumber);
}
- else{
- if(currentSequenceNumber<largestReceivedSeqNumber){
+ else if(currentSequenceNumber<largestReceivedSeqNumber){
/*(6.b).if the sequence number is less than LRSN,remove it from
* the receiver's loss list
*/
receiverLossList.remove(currentSequenceNumber);
- }
}
statistics.incNumberOfReceivedDataPackets();
@@ -422,6 +413,7 @@
receiverLossList.insert(detectedLossSeqNumber);
}
endpoint.doSend(nAckPacket);
+ statistics.incNumberOfNAKSent();
}
protected void sendNAK(List<Long>sequenceNumbers)throws IOException{
@@ -447,7 +439,7 @@
estimateLinkCapacity=packetPairWindow.getEstimatedLinkCapacity();
acknowledgmentPkt.setEstimatedLinkCapacity(estimateLinkCapacity);
//set the packet arrival rate
- packetArrivalSpeed=(long)packetHistoryWindow.getPacketArrivalSpeed();
+ packetArrivalSpeed=packetHistoryWindow.getPacketArrivalSpeed();
acknowledgmentPkt.setPacketReceiveRate(packetArrivalSpeed);
endpoint.doSend(acknowledgmentPkt);
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -96,36 +96,46 @@
//last acknowledge number, initialised to the initial sequence number
private long lastAckSequenceNumber;
- //size of the send queue
- public final int sendQueueLength;
+ private volatile boolean started=false;
private volatile boolean stopped=false;
- private volatile AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+ private final CountDownLatch startLatch=new CountDownLatch(1);
- private volatile AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
+ private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+ private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
+
public UDTSender(UDTSession session,UDPEndPoint endpoint){
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
this.endpoint= endpoint;
this.session=session;
-
statistics=session.getStatistics();
- sendQueueLength=64;//session.getFlowWindowSize();
senderLossList=new SenderLossList();
- sendBuffer=new ConcurrentHashMap<Long, DataPacket>(sendQueueLength,0.75f,2);
- sendQueue = new LinkedBlockingQueue<DataPacket>(sendQueueLength);
+ sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2);
+ sendQueue = new LinkedBlockingQueue<DataPacket>(1000);
lastAckSequenceNumber=session.getInitialSequenceNumber();
waitForAckLatch.set(new CountDownLatch(1));
waitForSeqAckLatch.set(new CountDownLatch(1));
- start();
+ doStart();
}
+ /**
+ * start the sender thread
+ */
+ public void start(){
+ logger.info("Starting sender for "+session);
+ startLatch.countDown();
+ started=true;
+ }
+
//starts the sender algorithm
- private void start(){
+ private void doStart(){
Runnable r=new Runnable(){
public void run(){
try{
+ //wait until explicitely started
+ startLatch.await();
while(!stopped){
senderAlgorithm();
}
@@ -150,7 +160,7 @@
* @param data
* @throws IOException
* @throws InterruptedException
- */
+ */
private void send(DataPacket p)throws IOException{
synchronized(sendLock){
endpoint.doSend(p);
@@ -161,15 +171,6 @@
}
/**
- * writes a data packet into the sendQueue
- * @return <code>true</code>if the packet was added, <code>false</code> if the
- * packet could not be added because the queue was full
- */
- protected boolean sendUdtPacket(DataPacket p)throws IOException{
- return sendQueue.offer(p);
- }
-
- /**
* writes a data packet into the sendQueue, waiting at most for the specified time
* if this is not possible due to a full send queue
*
@@ -183,6 +184,7 @@
* @throws InterruptedException
*/
protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{
+ if(!started)start();
return sendQueue.offer(p,timeout,units);
}
@@ -244,7 +246,7 @@
*/
protected void onNAKPacketReceived(NegativeAcknowledgement nak){
waitForAckLatch.get().countDown();
-
+
for(Integer i: nak.getDecodedLossInfo()){
senderLossList.insert(new SenderLossListEntry(i));
}
@@ -252,12 +254,12 @@
session.getSocket().getReceiver().resetEXPTimer();
statistics.incNumberOfNAKReceived();
statistics.storeParameters();
-
+
if(logger.isLoggable(Level.FINER)){
logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
- +"set send period to "+session.getCongestionControl().getSendInterval());
+ +"set send period to "+session.getCongestionControl().getSendInterval());
}
-
+
return;
}
@@ -282,56 +284,45 @@
*/
public void senderAlgorithm()throws InterruptedException, IOException{
long iterationStart=Util.getCurrentTime();
+
//if the sender's loss list is not empty
SenderLossListEntry entry=senderLossList.getFirstEntry();
if (entry!=null) {
- long seqNumber = entry.getSequenceNumber();
- //TODO
- //if the current seqNumber is 16n,check the timeOut in the
- //loss list and send a message drop request.
- //if((seqNumber%16)==0){
- //sendLossList.checkTimeOut(timeToLive);
- //}
- try {
- //retransmit the packet with the first entry in the list
- //as sequence number and remove it from the list
- DataPacket pktToRetransmit = sendBuffer.get(seqNumber);
- if(pktToRetransmit!=null){
- endpoint.doSend(pktToRetransmit);
- statistics.incNumberOfRetransmittedDataPackets();
- }
- }catch (Exception e) {
- logger.log(Level.WARNING,"",e);
- }
- // return;
+ handleResubmit(entry);
}
-
- //if the number of unacknowledged data packets does not exceed the congestion
- //and the flow window sizes, pack a new packet
- int unAcknowledged=unacknowledged.get();
- if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
- && unAcknowledged<session.getFlowWindowSize()){
-
- if(sendQueue.size()==0){
- //Thread.yield();
+ else
+ {
+ //if the number of unacknowledged data packets does not exceed the congestion
+ //and the flow window sizes, pack a new packet
+ int unAcknowledged=unacknowledged.get();
+
+ if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
+ && unAcknowledged<session.getFlowWindowSize()){
+ //check for application data
+ DataPacket dp=sendQueue.poll();//10*Util.getSYNTime(),TimeUnit.MICROSECONDS);
+ if(dp!=null){
+ send(dp);
+ largestSentSequenceNumber=dp.getPacketSequenceNumber();
+ }
+ else {
+ Thread.yield();
+ return;
+ }
+ }else{
+ //congestion window full, should we *really* wait for an ack?!
+ if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){
+ statistics.incNumberOfCCWindowExceededEvents();
+ }
+ Thread.sleep(1);
+ //waitForAck();
return;
}
- DataPacket dp=sendQueue.poll(20,TimeUnit.MILLISECONDS);
- if(dp!=null){
- send(dp);
- largestSentSequenceNumber=dp.getPacketSequenceNumber();
- }
- }else{
- //should we *really* wait for an ack?!
- if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){
- statistics.incNumberOfCCWindowExceededEvents();
- }
- Thread.sleep(1);
- //waitForAck();
}
//wait
+
+
double snd=session.getCongestionControl().getSendInterval();
long passed=Util.getCurrentTime()-iterationStart;
int x=0;
@@ -339,17 +330,42 @@
if(x++==0)statistics.incNumberOfCCSlowDownEvents();
//we cannot wait with microsecond precision
if(snd-passed>750)Thread.sleep(1);
- else Thread.yield();
+ else if((snd-passed)/snd > 0.9){
+ return;
+ }
passed=Util.getCurrentTime()-iterationStart;
}
-
+
}
/**
+ * re-submits an entry from the sender loss list
+ * @param entry
+ */
+ protected void handleResubmit(SenderLossListEntry entry){
+ long seqNumber = entry.getSequenceNumber();
+ //TODO
+ //if the current seqNumber is 16n,check the timeOut in the
+ //loss list and send a message drop request.
+ //if((seqNumber%16)==0){
+ //sendLossList.checkTimeOut(timeToLive);
+ //}
+ try {
+ //retransmit the packet and remove it from the list
+ DataPacket pktToRetransmit = sendBuffer.get(seqNumber);
+ if(pktToRetransmit!=null){
+ endpoint.doSend(pktToRetransmit);
+ statistics.incNumberOfRetransmittedDataPackets();
+ }
+ }catch (Exception e) {
+ logger.log(Level.WARNING,"",e);
+ }
+ }
+
+ /**
* for processing EXP event (see spec. p 13)
*/
protected void putUnacknowledgedPacketsIntoLossList(){
-
synchronized (sendLock) {
for(Long l: sendBuffer.keySet()){
senderLossList.insert(new SenderLossListEntry(l));
@@ -391,7 +407,7 @@
return largestSentSequenceNumber>=sequenceNumber;
}
-
+
boolean haveLostPackets(){
return !senderLossList.isEmpty();
}
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -32,6 +32,7 @@
package udt;
+import java.net.DatagramPacket;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -64,11 +65,14 @@
protected final CongestionControl cc;
+ //cache dgPacket (peer stays the same always)
+ private DatagramPacket dgPacket;
+
/**
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=1024;
+ protected int flowWindowSize=64;
/**
* remote UDT entity (address and socket ID)
@@ -105,7 +109,7 @@
statistics=new UDTStatistics(description);
mySocketID=nextSocketID.incrementAndGet();
this.destination=destination;
-
+ this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort());
//init configurable CC
String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName());
Object ccObject=null;
@@ -210,4 +214,7 @@
this.initialSequenceNumber=initialSequenceNumber;
}
+ public DatagramPacket getDatagram(){
+ return dgPacket;
+ }
}
Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -148,7 +148,7 @@
*/
protected void doWrite(byte[]data, int offset, int length)throws IOException{
try{
- doWrite(data, offset, length, 5, TimeUnit.MILLISECONDS);
+ doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}catch(InterruptedException ie){
IOException io=new IOException();
io.initCause(ie);
@@ -163,14 +163,13 @@
* @param length
* @param timeout
* @param units
- * @throws IOException
+ * @throws IOException - if data cannot be sent
* @throws InterruptedException
*/
protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{
int chunksize=session.getDatagramSize()-24;//need some bytes for the header
ByteBuffer bb=ByteBuffer.wrap(data,offset,length);
long seqNo=0;
- int i=0;
while(bb.remaining()>0){
int len=Math.min(bb.remaining(),chunksize);
byte[]chunk=new byte[len];
@@ -182,10 +181,9 @@
packet.setDestinationID(session.getDestination().getSocketID());
packet.setData(chunk);
//put the packet into the send queue
- while(!sender.sendUdtPacket(packet, timeout, units)){
- Thread.sleep(1);
+ if(!sender.sendUdtPacket(packet, timeout, units)){
+ throw new IOException("Queue full");
}
- i++;
}
if(length>0)active=true;
}
Modified: udt-java/trunk/src/main/java/udt/packets/Destination.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -33,7 +33,6 @@
package udt.packets;
import java.net.InetAddress;
-import java.net.UnknownHostException;
public class Destination {
@@ -49,7 +48,7 @@
this.port=port;
}
- public InetAddress getAddress()throws UnknownHostException{
+ public InetAddress getAddress(){
return address;
}
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -34,13 +34,8 @@
import udt.util.CircularArray;
-
-
/**
* A circular array that records the packet arrival times
- *
- *
- *
*/
public class PacketHistoryWindow extends CircularArray<Long>{
@@ -57,7 +52,7 @@
* (see specification section 6.2, page 12)
* @return the current value
*/
- public double getPacketArrivalSpeed(){
+ public long getPacketArrivalSpeed(){
if(!haveOverflow)return 0;
int num=max-1;
double AI;
@@ -94,7 +89,7 @@
else{
medianPacketArrivalSpeed=0;
}
- return medianPacketArrivalSpeed;
+ return (long)Math.ceil(medianPacketArrivalSpeed);
}
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -72,4 +72,8 @@
public long size(){
return backingList.size();
}
+
+ public String toString(){
+ return backingList.toString();
+ }
}
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossListEntry.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -95,6 +95,6 @@
}
public String toString(){
- return "lossListEntry-"+sequenceNumber;
+ return "lost-"+sequenceNumber;
}
}
Deleted: udt-java/trunk/src/main/java/udt/util/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/util/FlowWindow.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -1,87 +0,0 @@
-/*********************************************************************************
- * Copyright (c) 2010 Forschungszentrum Juelich GmbH
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * (1) Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the disclaimer at the end. Redistributions in
- * binary form must reproduce the above copyright notice, this list of
- * conditions and the following disclaimer in the documentation and/or other
- * materials provided with the distribution.
- *
- * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * DISCLAIMER
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *********************************************************************************/
-
-package udt.util;
-
-import java.util.concurrent.PriorityBlockingQueue;
-
-/**
- * bounded queue
- *
- */
-public class FlowWindow<E> extends PriorityBlockingQueue<E> {
-
- private static final long serialVersionUID=1l;
-
- private volatile int capacity;
-
- /**
- * create a new flow window with the given size
- *
- * @param size - the initial size of the flow window
- */
- public FlowWindow(int size){
- super();
- this.capacity=size;
- }
-
- /**
- * create a new flow window with the default size of 16
- */
- public FlowWindow(){
- this(16);
- }
-
- public void setCapacity(int newSize){
- capacity=newSize;
- }
-
- public int getCapacity(){
- return capacity;
- }
-
- /**
- * try to add an element to the queue, return false if it is not possible
- */
- @Override
- public boolean offer(E e) {
- if(contains(e)){
- return true;
- }
- if(size()<capacity){
- return super.offer(e);
- }else return false;
- }
-
-
-
-}
Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -13,14 +13,36 @@
private final NumberFormat format;
+ private final boolean verbose;
+ private final long nValue;
+ private long start;
+
+ private String msg;
+
public MeanValue(){
+ this(false, 64);
+ }
+
+ public MeanValue(boolean verbose){
+ this(verbose, 64);
+ }
+
+ public MeanValue(boolean verbose, int nValue){
format=NumberFormat.getNumberInstance();
format.setMaximumFractionDigits(2);
+ this.verbose=verbose;
+ this.nValue=nValue;
+ begin();
}
+
public void addValue(double value){
mean=(mean*n+value)/(n+1);
n++;
+ if(verbose && n % nValue == 1){
+ if(msg!=null)System.out.print(msg+" ");
+ System.out.println(getFormattedMean());
+ }
}
public double getMean(){
@@ -35,4 +57,16 @@
mean=0;
n=0;
}
+
+ public void begin(){
+ start=Util.getCurrentTime();
+ }
+
+ public void end(){
+ addValue(Util.getCurrentTime()-start);
+ }
+ public void end(String msg){
+ this.msg=msg;
+ addValue(Util.getCurrentTime()-start);
+ }
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -108,6 +108,9 @@
System.out.println("[ReceiveFile] Rate: "+(int)mbytes+" MBytes/sec. "+(int)mbit+" MBit/sec.");
client.shutdown();
+
+ if(verbose)System.out.println(client.getStatistics());
+
}finally{
fos.close();
}
Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -151,7 +151,7 @@
out.write(PacketUtil.encode(size));
long start=System.currentTimeMillis();
//and send the file
- Util.copy(fis, out, size, true);
+ Util.copy(fis, out, size, false);
long end=System.currentTimeMillis();
System.out.println(socket.getSession().getStatistics().toString());
double rate=1000.0*size/1024/1024/(end-start);
Modified: udt-java/trunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -121,7 +121,7 @@
* @throws IOException
*/
public static void copy(InputStream source, OutputStream target, long size, boolean flush)throws IOException{
- byte[]buf=new byte[1*1024*1024];
+ byte[]buf=new byte[65536];
int c;
long read=0;
while(true){
@@ -149,5 +149,5 @@
p.setPort(clientPort);
endpoint.sendRaw(p);
}
-
+
}
Modified: udt-java/trunk/src/test/java/udt/TestList.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestList.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/test/java/udt/TestList.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -12,7 +12,6 @@
import udt.sender.SenderLossList;
import udt.sender.SenderLossListEntry;
import udt.util.CircularArray;
-import udt.util.FlowWindow;
/*
* tests for the various list and queue classes
@@ -33,27 +32,16 @@
c.add(11);
System.out.println(c);
}
-
- public void testFlowWindow(){
- FlowWindow<Long>f=new FlowWindow<Long>(5);
- for(int i=0;i<5;i++){
- System.out.println(i);
- assertTrue(f.add(Long.valueOf(i)));
- }
- assertFalse(f.add(0l));
- f.setCapacity(6);
- assertTrue(f.add(0l));
- }
-
+
public void testPacketHistoryWindow(){
PacketHistoryWindow packetHistoryWindow = new PacketHistoryWindow(16);
-
- for(int i=0;i<17;i++){
- packetHistoryWindow.add(i*5000l);
+ long offset=1000000;
+ for(int i=0;i<28;i++){
+ packetHistoryWindow.add(offset+i*5000l);
}
//packets arrive every 5 ms, so packet arrival rate is 200/sec
- assertEquals(200.0,packetHistoryWindow.getPacketArrivalSpeed());
+ assertEquals(200,packetHistoryWindow.getPacketArrivalSpeed());
}
@@ -109,6 +97,9 @@
d1.setPacketSequenceNumber(1);
DataPacket d2=new DataPacket();
d2.setPacketSequenceNumber(2);
+ DataPacket d3=new DataPacket();
+ d3.setPacketSequenceNumber(3);
+ q.offer(d3);
q.offer(d2);
q.offer(d1);
q.offer(control);
@@ -116,16 +107,28 @@
UDTPacket p1=q.poll();
assertTrue(p1.isControlPacket());
- UDTPacket p2=q.poll();
- assertFalse(p2.isControlPacket());
+ UDTPacket p=q.poll();
+ assertFalse(p.isControlPacket());
//check ordering by sequence number
- assertEquals(1,p2.getPacketSequenceNumber());
+ assertEquals(1,p.getPacketSequenceNumber());
- UDTPacket p3=q.poll();
- assertFalse(p3.isControlPacket());
- assertEquals(2,p3.getPacketSequenceNumber());
+ DataPacket d=new DataPacket();
+ d.setPacketSequenceNumber(54);
+ q.offer(d);
+ p=q.poll();
+ assertFalse(p.isControlPacket());
+ assertEquals(2,p.getPacketSequenceNumber());
+ p=q.poll();
+ assertFalse(p.isControlPacket());
+ assertEquals(3,p.getPacketSequenceNumber());
+
+ p=q.poll();
+ assertFalse(p.isControlPacket());
+ assertEquals(54,p.getPacketSequenceNumber());
+
+
}
}
Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -17,7 +17,8 @@
Thread.sleep(500);
}while(!serverStarted);
- File f=new File("src/test/java/datafile");
+ //File f=new File("src/test/java/datafile");
+ File f=new File("/tmp/100MB");
File tmp=File.createTempFile("udtest-", null);
Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -29,7 +29,6 @@
//set an artificial loss rate
public void testWithLoss()throws Exception{
UDTReceiver.dropRate=3;
- UDTReceiver.connectionExpiryDisabled=true;
TIMEOUT=Integer.MAX_VALUE;
num_packets=512;
//set log level
@@ -40,7 +39,6 @@
//send even more data
public void testLargeDataSet()throws Exception{
UDTReceiver.dropRate=0;
- UDTReceiver.connectionExpiryDisabled=true;
TIMEOUT=Integer.MAX_VALUE;
num_packets=3*1024;
//set log level
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=100;
+ int num_packets=50;
//how large is a single packet
int size=1*1024*1024;
@@ -35,7 +35,6 @@
//System.setProperty(UDTSession.CC_CLASS, NullCongestionControl.class.getName());
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- //UDTReceiver.connectionExpiryDisabled=true;
doTest();
}
@@ -56,12 +55,12 @@
MessageDigest digest=MessageDigest.getInstance("MD5");
while(!serverRunning)Thread.sleep(100);
long start=System.currentTimeMillis();
- System.out.println("Sending <"+num_packets+"> packets of <"+size/1024/1024+"> Mbytes each");
+ System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each");
long end=0;
if(serverRunning){
for(int i=0;i<num_packets;i++){
long block=System.currentTimeMillis();
- client.sendBlocking(data);
+ client.send(data);
digest.update(data);
double took=System.currentTimeMillis()-block;
double arrival=client.getStatistics().getPacketArrivalRate();
@@ -71,6 +70,7 @@
+ " snd: "+format.format(snd)
+" rate: "+format.format(size/(1024*took))+ " MB/sec");
}
+ client.flush();
end=System.currentTimeMillis();
client.shutdown();
}else throw new IllegalStateException();
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-26 06:36:47 UTC (rev 27)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-04-28 14:51:57 UTC (rev 28)
@@ -7,13 +7,14 @@
import junit.framework.TestCase;
import udt.UDPEndPoint;
+import udt.util.MeanValue;
/**
* send some data over a UDP connection and measure performance
*/
public class UDPTest extends TestCase {
- final int num_packets=5*1000;
+ final int num_packets=10*1000;
final int packetSize=UDPEndPoint.DATAGRAM_SIZE;
public void test1()throws Exception{
@@ -30,9 +31,12 @@
dp.setAddress(InetAddress.getByName("localhost"));
dp.setPort(65321);
System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes");
+ MeanValue v=new MeanValue();
for(int i=0;i<num_packets;i++){
dp.setData(data);
+ v.begin();
s.send(dp);
+ v.end();
}
System.out.println("Finished sending.");
while(serverRunning)Thread.sleep(10);
@@ -41,6 +45,7 @@
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec");
System.out.println("Rate "+num_packets+" packets/sec");
+ System.out.println("Mean send time "+v.getFormattedMean()+" microsec");
System.out.println("Server received: "+total);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|