udt-java-commits Mailing List for UDT-Java
Status: Alpha
Brought to you by:
bschuller
You can subscribe to this list here.
| 2010 |
Jan
|
Feb
|
Mar
|
Apr
(17) |
May
(4) |
Jun
(1) |
Jul
|
Aug
(3) |
Sep
(7) |
Oct
|
Nov
(1) |
Dec
|
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2011 |
Jan
(1) |
Feb
(1) |
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(10) |
Sep
|
Oct
(1) |
Nov
(1) |
Dec
(2) |
| 2012 |
Jan
|
Feb
(4) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
|
From: <bsc...@us...> - 2012-05-25 08:03:15
|
Revision: 74
http://udt-java.svn.sourceforge.net/udt-java/?rev=74&view=rev
Author: bschuller
Date: 2012-05-25 08:03:03 +0000 (Fri, 25 May 2012)
Log Message:
-----------
attempt to switch to new UDT-C++ style of connection handshake. DOES NOT WORK yet
Modified Paths:
--------------
udt-java/trunk/pom.xml
udt-java/trunk/src/main/java/udt/ClientSession.java
udt-java/trunk/src/main/java/udt/ServerSession.java
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTServerSocket.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/main/java/udt/util/SequenceNumber.java
udt-java/trunk/src/test/java/echo/EchoServer.java
udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java
Modified: udt-java/trunk/pom.xml
===================================================================
--- udt-java/trunk/pom.xml 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/pom.xml 2012-05-25 08:03:03 UTC (rev 74)
@@ -5,7 +5,7 @@
<artifactId>udt-java</artifactId>
<packaging>jar</packaging>
<name>UDT Java implementation</name>
- <version>0.6-SNAPSHOT</version>
+ <version>0.7-SNAPSHOT</version>
<url>http://sourceforge.net/projects/udt-java</url>
<developers>
<developer>
Modified: udt-java/trunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ClientSession.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/ClientSession.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -52,6 +52,8 @@
private UDPEndPoint endPoint;
+ long initialSequenceNo=SequenceNumber.random();
+
public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
@@ -60,7 +62,7 @@
/**
* send connection handshake until a reply from server is received
- * TODO check for timeout
+
* @throws InterruptedException
* @throws IOException
*/
@@ -68,11 +70,21 @@
public void connect() throws InterruptedException,IOException{
int n=0;
while(getState()!=ready){
- sendHandShake();
if(getState()==invalid)throw new IOException("Can't connect!");
- n++;
- if(getState()!=ready)Thread.sleep(500);
+ if(getState()<=handshaking){
+ setState(handshaking);
+ sendInitialHandShake();
+ }
+ else if(getState()==handshaking+1){
+ sendSecondHandshake();
+ }
+
+ if(getState()==invalid)throw new IOException("Can't connect!");
+ if(n++ > 10)throw new IOException("Could not connect to server within the timeout.");
+
+ Thread.sleep(500);
}
+ Thread.sleep(1000);
cc.init();
logger.info("Connected, "+n+" handshake packets sent");
}
@@ -82,38 +94,10 @@
lastPacket=packet;
- if (packet instanceof ConnectionHandshake) {
+ if (packet.isConnectionHandshake()) {
ConnectionHandshake hs=(ConnectionHandshake)packet;
-
- logger.info("Received connection handshake from "+peer+"\n"+hs);
-
- if (getState()!=ready) {
- if(hs.getConnectionType()==1){
- try{
- //TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
- destination.setSocketID(peerSocketID);
- sendConfirmation(hs);
- }catch(Exception ex){
- logger.log(Level.WARNING,"Error creating socket",ex);
- setState(invalid);
- }
- return;
- }
- else{
- try{
- //TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
- destination.setSocketID(peerSocketID);
- setState(ready);
- socket=new UDTSocket(endPoint,this);
- }catch(Exception ex){
- logger.log(Level.WARNING,"Error creating socket",ex);
- setState(invalid);
- }
- return;
- }
- }
+ handleConnectionHandshake(hs,peer);
+ return;
}
if(getState() == ready) {
@@ -140,9 +124,43 @@
}
}
+ protected void handleConnectionHandshake(ConnectionHandshake hs, Destination peer){
- //handshake for connect
- protected void sendHandShake()throws IOException{
+ if (getState()==handshaking) {
+ logger.info("Received initial handshake response from "+peer+"\n"+hs);
+ if(hs.getConnectionType()==ConnectionHandshake.CONNECTION_SERVER_ACK){
+ try{
+ //TODO validate parameters sent by peer
+ long peerSocketID=hs.getSocketID();
+ sessionCookie=hs.getCookie();
+ destination.setSocketID(peerSocketID);
+ setState(handshaking+1);
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"Error creating socket",ex);
+ setState(invalid);
+ }
+ return;
+ }
+ else{
+ logger.info("Unexpected type of handshake packet received");
+ setState(invalid);
+ }
+ }
+ else if(getState()==handshaking+1){
+ try{
+ logger.info("Received confirmation handshake response from "+peer+"\n"+hs);
+ //TODO validate parameters sent by peer
+ setState(ready);
+ socket=new UDTSocket(endPoint,this);
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"Error creating socket",ex);
+ setState(invalid);
+ }
+ }
+ }
+
+ //initial handshake for connect
+ protected void sendInitialHandShake()throws IOException{
ConnectionHandshake handshake = new ConnectionHandshake();
handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
@@ -153,20 +171,24 @@
handshake.setSocketID(mySocketID);
handshake.setMaxFlowWndSize(flowWindowSize);
handshake.setSession(this);
+ handshake.setAddress(endPoint.getLocalAddress());
logger.info("Sending "+handshake);
endPoint.doSend(handshake);
}
//2nd handshake for connect
- protected void sendConfirmation(ConnectionHandshake hs)throws IOException{
+ protected void sendSecondHandshake()throws IOException{
ConnectionHandshake handshake = new ConnectionHandshake();
- handshake.setConnectionType(-1);
+ handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
- handshake.setInitialSeqNo(hs.getInitialSeqNo());
- handshake.setPacketSize(hs.getPacketSize());
+ handshake.setInitialSeqNo(initialSequenceNo);
+ handshake.setPacketSize(getDatagramSize());
handshake.setSocketID(mySocketID);
handshake.setMaxFlowWndSize(flowWindowSize);
handshake.setSession(this);
+ handshake.setCookie(sessionCookie);
+ handshake.setAddress(endPoint.getLocalAddress());
+ handshake.setDestinationID(getDestination().getSocketID());
logger.info("Sending confirmation "+handshake);
endPoint.doSend(handshake);
}
Modified: udt-java/trunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ServerSession.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/ServerSession.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -33,7 +33,6 @@
package udt;
import java.io.IOException;
-import java.net.DatagramPacket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.logging.Level;
@@ -43,6 +42,7 @@
import udt.packets.Destination;
import udt.packets.KeepAlive;
import udt.packets.Shutdown;
+import udt.util.SequenceNumber;
/**
* server side session in client-server mode
@@ -56,10 +56,10 @@
//last received packet (for testing purposes)
private UDTPacket lastPacket;
- public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
- super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort()));
+ public ServerSession(Destination peer, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
+ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+peer.getAddress()+":"+peer.getPort(),peer);
this.endPoint=endPoint;
- logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
+ logger.info("Created "+toString()+" talking to "+peer.getAddress()+":"+peer.getPort());
}
int n_handshake=0;
@@ -68,79 +68,45 @@
public void received(UDTPacket packet, Destination peer){
lastPacket=packet;
- if(packet instanceof ConnectionHandshake) {
- ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet;
- logger.info("Received "+connectionHandshake);
+ if(packet.isConnectionHandshake()) {
+ handleHandShake((ConnectionHandshake)packet);
+ return;
+ }
- if (getState()<=ready){
- destination.setSocketID(connectionHandshake.getSocketID());
-
- if(getState()<=handshaking){
- setState(handshaking);
- }
- try{
- handleHandShake(connectionHandshake);
- n_handshake++;
- try{
- setState(ready);
- socket=new UDTSocket(endPoint, this);
- cc.init();
- }catch(Exception uhe){
- //session is invalid
- logger.log(Level.SEVERE,"",uhe);
- setState(invalid);
- }
- }catch(IOException ex){
- //session invalid
- logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex);
- setState(invalid);
- }
- return;
- }
-
- }else if(packet instanceof KeepAlive) {
+ if(packet instanceof KeepAlive) {
socket.getReceiver().resetEXPTimer();
active = true;
return;
}
- if(getState()== ready) {
- active = true;
-
- if (packet instanceof KeepAlive) {
- //nothing to do here
- return;
- }else if (packet instanceof Shutdown) {
- try{
- socket.getReceiver().stop();
- }catch(IOException ex){
- logger.log(Level.WARNING,"",ex);
- }
- setState(shutdown);
- System.out.println("SHUTDOWN ***");
- active = false;
- logger.info("Connection shutdown initiated by the other side.");
- return;
+ if (packet instanceof Shutdown) {
+ try{
+ socket.getReceiver().stop();
+ }catch(IOException ex){
+ logger.log(Level.WARNING,"",ex);
}
+ setState(shutdown);
+ active = false;
+ logger.info("Connection shutdown initiated by peer.");
+ return;
+ }
- else{
- try{
- if(packet.forSender()){
- socket.getSender().receive(packet);
- }else{
- socket.getReceiver().receive(packet);
- }
- }catch(Exception ex){
- //session invalid
- logger.log(Level.SEVERE,"",ex);
- setState(invalid);
+ if(getState() == ready) {
+ active = true;
+ try{
+ if(packet.forSender()){
+ socket.getSender().receive(packet);
+ }else{
+ socket.getReceiver().receive(packet);
}
+ }catch(Exception ex){
+ //session invalid
+ logger.log(Level.SEVERE,"",ex);
+ setState(invalid);
}
return;
-
}
-
}
/**
@@ -151,16 +117,73 @@
}
/**
- * handle the connection handshake:<br/>
- * <ul>
- * <li>set initial sequence number</li>
- * <li>send response handshake</li>
- * </ul>
+ * reply to a connection handshake message
+ * @param connectionHandshake
+ */
+ protected void handleHandShake(ConnectionHandshake connectionHandshake){
+ logger.info("Received "+connectionHandshake + " in state <"+getState()+">");
+ if(getState()==ready){
+ //just send confirmation packet again
+ try{
+ sendFinalHandShake(connectionHandshake);
+ }catch(IOException io){}
+ return;
+ }
+
+ if (getState()<ready){
+ destination.setSocketID(connectionHandshake.getSocketID());
+
+ if(getState()<handshaking){
+ setState(handshaking);
+ }
+
+ try{
+ n_handshake++;
+ boolean handShakeComplete=handleSecondHandShake(connectionHandshake);
+ if(handShakeComplete){
+ logger.info("Client/Server handshake complete!");
+ setState(ready);
+ socket=new UDTSocket(endPoint, this);
+ cc.init();
+ }
+ }catch(IOException ex){
+ //session invalid
+ logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex);
+ setState(invalid);
+ }
+ }
+ }
+
+ private ConnectionHandshake finalConnectionHandshake;
+
+ /**
+ * handle the connection handshake
+ *
* @param handshake
* @param peer
* @throws IOException
*/
- protected void handleHandShake(ConnectionHandshake handshake)throws IOException{
+ protected boolean handleSecondHandShake(ConnectionHandshake handshake)throws IOException{
+ if(sessionCookie==0){
+ ackInitialHandshake(handshake);
+ //need one more handshake
+ return false;
+ }
+
+ long otherCookie=handshake.getCookie();
+ if(sessionCookie!=otherCookie){
+ setState(invalid);
+ throw new IOException("Invalid cookie <"+otherCookie+"> received, my cookie is <"+sessionCookie+">");
+ }
+ sendFinalHandShake(handshake);
+ return true;
+ }
+
+ /*
+ * response after the initial connection handshake received:
+ * compute cookie
+ */
+ protected void ackInitialHandshake(ConnectionHandshake handshake)throws IOException{
ConnectionHandshake responseHandshake = new ConnectionHandshake();
//compare the packet size and choose minimun
long clientBufferSize=handshake.getPacketSize();
@@ -178,12 +201,40 @@
responseHandshake.setSocketID(mySocketID);
responseHandshake.setDestinationID(this.getDestination().getSocketID());
responseHandshake.setSession(this);
+ sessionCookie=SequenceNumber.random();
+ responseHandshake.setCookie(sessionCookie);
+ responseHandshake.setAddress(endPoint.getLocalAddress());
logger.info("Sending reply "+responseHandshake);
endPoint.doSend(responseHandshake);
}
+ protected void sendFinalHandShake(ConnectionHandshake handshake)throws IOException{
+ if(finalConnectionHandshake==null){
+ finalConnectionHandshake= new ConnectionHandshake();
+ //compare the packet size and choose minimun
+ long clientBufferSize=handshake.getPacketSize();
+ long myBufferSize=getDatagramSize();
+ long bufferSize=Math.min(clientBufferSize, myBufferSize);
+ long initialSequenceNumber=handshake.getInitialSeqNo();
+ setInitialSequenceNumber(initialSequenceNumber);
+ setDatagramSize((int)bufferSize);
+ finalConnectionHandshake.setPacketSize(bufferSize);
+ finalConnectionHandshake.setUdtVersion(4);
+ finalConnectionHandshake.setInitialSeqNo(initialSequenceNumber);
+ finalConnectionHandshake.setConnectionType(-1);
+ finalConnectionHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize());
+ //tell peer what the socket ID on this side is
+ finalConnectionHandshake.setSocketID(mySocketID);
+ finalConnectionHandshake.setDestinationID(this.getDestination().getSocketID());
+ finalConnectionHandshake.setSession(this);
+ finalConnectionHandshake.setCookie(sessionCookie);
+ finalConnectionHandshake.setAddress(endPoint.getLocalAddress());
+ }
+ logger.info("Sending final handshake ack "+finalConnectionHandshake);
+ endPoint.doSend(finalConnectionHandshake);
+ }
}
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -40,6 +40,8 @@
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
@@ -70,10 +72,12 @@
//last received packet
private UDTPacket lastPacket;
+ private final Map<Destination,UDTSession> sessionsBeingConnected=Collections.synchronizedMap(new HashMap<Destination,UDTSession>());
+
//if the endpoint is configured for a server socket,
//this queue is used to handoff new UDTSessions to the application
private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
-
+
private boolean serverSocketMode=false;
//has the endpoint been stopped?
@@ -90,7 +94,7 @@
this.dgSocket=socket;
port=dgSocket.getLocalPort();
}
-
+
/**
* bind to any local port on the given host address
* @param localAddress
@@ -116,7 +120,7 @@
}
if(localPort>0)this.port = localPort;
else port=dgSocket.getLocalPort();
-
+
configureSocket();
}
@@ -127,7 +131,7 @@
dgSocket.setReceiveBufferSize(128*1024);
dgSocket.setReuseAddress(false);
}
-
+
/**
* bind to the default network interface on the machine
*
@@ -240,79 +244,75 @@
* </ul>
* @throws IOException
*/
- private long lastDestID=-1;
- private UDTSession lastSession;
-
- private int n=0;
-
- private final Object lock=new Object();
-
protected void doReceive()throws IOException{
while(!stopped){
try{
- try{
-
- //will block until a packet is received or timeout has expired
- dgSocket.receive(dp);
-
- Destination peer=new Destination(dp.getAddress(), dp.getPort());
- int l=dp.getLength();
- UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
- lastPacket=packet;
+ //will block until a packet is received or timeout has expired
+ dgSocket.receive(dp);
- //handle connection handshake
- if(packet.isConnectionHandshake()){
- synchronized(lock){
- Long id=Long.valueOf(packet.getDestinationID());
- UDTSession session=sessions.get(id);
- if(session==null){
- session=new ServerSession(dp,this);
- addSession(session.getSocketID(),session);
- //TODO need to check peer to avoid duplicate server session
- if(serverSocketMode){
- logger.fine("Pooling new request.");
- sessionHandoff.put(session);
- logger.fine("Request taken for processing.");
- }
- }
- peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
- session.received(packet,peer);
- }
- }
- else{
- //dispatch to existing session
- long dest=packet.getDestinationID();
- UDTSession session;
- if(dest==lastDestID){
- session=lastSession;
- }
- else{
- session=sessions.get(dest);
- lastSession=session;
- lastDestID=dest;
- }
- if(session==null){
- n++;
- if(n%100==1){
- logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
- }
- }
- else{
- session.received(packet,peer);
- }
- }
- }catch(SocketException ex){
- logger.log(Level.INFO, "SocketException: "+ex.getMessage());
- }catch(SocketTimeoutException ste){
- //can safely ignore... we will retry until the endpoint is stopped
+ Destination peer=new Destination(dp.getAddress(), dp.getPort());
+ int l=dp.getLength();
+ UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
+ lastPacket=packet;
+
+ long dest=packet.getDestinationID();
+ UDTSession session=sessions.get(dest);
+ if(session!=null){
+ //dispatch to existing session
+ session.received(packet,peer);
}
-
+ else if(packet.isConnectionHandshake()){
+ connectionHandshake((ConnectionHandshake)packet, peer);
+ }
+ else{
+ logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
+ }
+ }catch(SocketException ex){
+ logger.log(Level.INFO, "SocketException: "+ex.getMessage());
+ }catch(SocketTimeoutException ste){
+ //can safely ignore... we will retry until the endpoint is stopped
}catch(Exception ex){
logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
}
}
}
+ /**
+ * called when a "connection handshake" packet was received and no
+ * matching session yet exists
+ *
+ * @param packet
+ * @param peer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected synchronized void connectionHandshake(ConnectionHandshake packet, Destination peer)throws IOException, InterruptedException{
+ Destination p=new Destination(peer.getAddress(),peer.getPort());
+ UDTSession session=sessionsBeingConnected.get(peer);
+ long destID=packet.getDestinationID();
+ if(session!=null && session.getSocketID()==destID){
+ //confirmation handshake
+ sessionsBeingConnected.remove(p);
+ addSession(destID, session);
+ }
+ else if(session==null){
+ session=new ServerSession(peer,this);
+ sessionsBeingConnected.put(p,session);
+ sessions.put(session.getSocketID(), session);
+ if(serverSocketMode){
+ logger.fine("Pooling new request.");
+ sessionHandoff.put(session);
+ logger.fine("Request taken for processing.");
+ }
+ }
+ else {
+ throw new IOException("dest ID sent by client does not match");
+ }
+ Long peerSocketID=((ConnectionHandshake)packet).getSocketID();
+ peer.setSocketID(peerSocketID);
+ session.received(packet,peer);
+ }
+
protected void doSend(UDTPacket packet)throws IOException{
byte[]data=packet.getEncoded();
DatagramPacket dgp = packet.getSession().getDatagram();
@@ -327,4 +327,5 @@
public void sendRaw(DatagramPacket p)throws IOException{
dgSocket.send(p);
}
+
}
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -80,12 +80,11 @@
//create client session...
clientSession=new ClientSession(clientEndpoint,destination);
clientEndpoint.addSession(clientSession.getSocketID(), clientSession);
-
clientEndpoint.start();
clientSession.connect();
//wait for handshake
while(!clientSession.isReady()){
- Thread.sleep(5);
+ Thread.sleep(50);
}
logger.info("The UDTClient is connected");
}
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -78,9 +78,19 @@
@Override
public int read()throws IOException{
int b=0;
- while(b==0)
+ while(b==0){
b=read(single);
-
+ if(b==0){
+ try{
+ while(receiveBuffer.isEmpty()){
+ Thread.sleep(20);
+ }
+ }catch(InterruptedException ie){
+ throw new IOException(ie);
+ }
+ }
+ }
+
if(b>0){
return single[0] & 0xFF;
}
@@ -153,9 +163,7 @@
else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS);
}catch(InterruptedException ie){
- IOException ex=new IOException();
- ex.initCause(ie);
- throw ex;
+ throw new IOException(ie);
}
return;
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -196,9 +196,14 @@
//starts the sender algorithm
private void start(){
+
Runnable r=new Runnable(){
public void run(){
try{
+ while(session.getSocket()==null)Thread.sleep(100);
+ session.getSocket().getInputStream();
+
+ logger.info("STARTING RECEIVER for "+session);
nextACK=Util.getCurrentTime()+ackTimerInterval;
nextNAK=(long)(Util.getCurrentTime()+1.5*nakTimerInterval);
nextEXP=Util.getCurrentTime()+2*expTimerInterval;
@@ -224,6 +229,9 @@
*/
protected void receive(UDTPacket p)throws IOException{
if(storeStatistics)dgReceiveInterval.end();
+ if(!p.isControlPacket()){
+ System.out.println("++ "+p+" queuesize="+handoffQueue.size());
+ }
handoffQueue.offer(p);
if(storeStatistics)dgReceiveInterval.begin();
}
@@ -265,6 +273,7 @@
needEXPReset=true;
}
}
+
if(needEXPReset){
nextEXP=Util.getCurrentTime()+expTimerInterval;
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -151,7 +151,7 @@
* start the sender thread
*/
public void start(){
- logger.info("Starting sender for "+session);
+ logger.info("STARTING SENDER for "+session);
startLatch.countDown();
started=true;
}
Modified: udt-java/trunk/src/main/java/udt/UDTServerSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -31,6 +31,7 @@
*********************************************************************************/
package udt;
+
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
@@ -38,8 +39,8 @@
import java.util.logging.Logger;
-
public class UDTServerSocket {
+
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
private final UDPEndPoint endpoint;
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -39,6 +39,7 @@
import java.util.logging.Logger;
import udt.packets.Destination;
+import udt.util.SequenceNumber;
import udt.util.UDTStatistics;
public abstract class UDTSession {
@@ -53,9 +54,9 @@
//state constants
public static final int start=0;
public static final int handshaking=1;
- public static final int ready=2;
- public static final int keepalive=3;
- public static final int shutdown=4;
+ public static final int ready=50;
+ public static final int keepalive=80;
+ public static final int shutdown=90;
public static final int invalid=99;
@@ -70,6 +71,9 @@
//cache dgPacket (peer stays the same always)
private DatagramPacket dgPacket;
+ //session cookie created during handshake
+ protected long sessionCookie=0;
+
/**
* flow window size, i.e. how many data packets are
* in-flight at a single time
@@ -209,7 +213,7 @@
public synchronized long getInitialSequenceNumber(){
if(initialSequenceNumber==null){
- initialSequenceNumber=1l; //TODO must be random?
+ initialSequenceNumber=SequenceNumber.random();
}
return initialSequenceNumber;
}
Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -33,6 +33,8 @@
package udt.packets;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
import udt.UDTSession;
@@ -49,21 +51,29 @@
private long packetSize;
private long maxFlowWndSize;
- public static final long CONNECTION_TYPE_REGULAR=1;
+ public static final long CONNECTION_TYPE_REGULAR=1L;
- public static final long CONNECTION_TYPE_RENDEZVOUS=0;
+ public static final long CONNECTION_TYPE_RENDEZVOUS=0L;
+ /**
+ * connection type in response handshake packet
+ */
+ public static final long CONNECTION_SERVER_ACK=-1L;
+
private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode
private long socketID;
private long cookie=0;
+ //address of the UDP socket
+ private InetAddress address;
+
public ConnectionHandshake(){
this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal();
}
- public ConnectionHandshake(byte[]controlInformation){
+ public ConnectionHandshake(byte[]controlInformation)throws IOException{
this();
decode(controlInformation);
}
@@ -73,7 +83,7 @@
return true;
}
- void decode(byte[]data){
+ void decode(byte[]data)throws IOException{
udtVersion =PacketUtil.decode(data, 0);
socketType=PacketUtil.decode(data, 4);
initialSeqNo=PacketUtil.decode(data, 8);
@@ -81,9 +91,9 @@
maxFlowWndSize=PacketUtil.decode(data, 16);
connectionType=PacketUtil.decode(data, 20);
socketID=PacketUtil.decode(data, 24);
- if(data.length>28){
- cookie=PacketUtil.decode(data, 28);
- }
+ cookie=PacketUtil.decode(data, 28);
+ //TODO ipv6 check
+ address=PacketUtil.decodeInetAddress(data, 32, false);
}
public long getUdtVersion() {
@@ -134,11 +144,25 @@
public void setSocketID(long socketID) {
this.socketID = socketID;
}
+ public long getCookie() {
+ return cookie;
+ }
+ public void setCookie(long cookie) {
+ this.cookie = cookie;
+ }
+ public InetAddress getAddress() {
+ return address;
+ }
+
+ public void setAddress(InetAddress address) {
+ this.address = address;
+ }
+
@Override
public byte[] encodeControlInformation(){
try {
- ByteArrayOutputStream bos=new ByteArrayOutputStream(24);
+ ByteArrayOutputStream bos=new ByteArrayOutputStream(48);
bos.write(PacketUtil.encode(udtVersion));
bos.write(PacketUtil.encode(socketType));
bos.write(PacketUtil.encode(initialSeqNo));
@@ -146,6 +170,8 @@
bos.write(PacketUtil.encode(maxFlowWndSize));
bos.write(PacketUtil.encode(connectionType));
bos.write(PacketUtil.encode(socketID));
+ bos.write(PacketUtil.encode(cookie));
+ bos.write(PacketUtil.encode(address));
return bos.toByteArray();
} catch (Exception e) {
// can't happen
@@ -178,6 +204,10 @@
return false;
if (udtVersion != other.udtVersion)
return false;
+ if (cookie!=other.cookie)
+ return false;
+ if (!address.equals(other.address))
+ return false;
return true;
}
@@ -198,6 +228,7 @@
sb.append(", socketType=").append(socketType);
sb.append(", destSocketID=").append(destinationID);
if(cookie>0)sb.append(", cookie=").append(cookie);
+ sb.append(", address=").append(address);
sb.append("]");
return sb.toString();
}
Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -31,6 +31,8 @@
*********************************************************************************/
package udt.packets;
+import java.io.IOException;
+
import udt.UDTPacket;
import udt.packets.ControlPacket.*;
@@ -42,13 +44,13 @@
* @param packetData
* @return
*/
- public static UDTPacket createPacket(byte[]encodedData){
+ public static UDTPacket createPacket(byte[]encodedData)throws IOException{
boolean isControl=(encodedData[0]&128) !=0 ;
if(isControl)return createControlPacket(encodedData,encodedData.length);
return new DataPacket(encodedData);
}
- public static UDTPacket createPacket(byte[]encodedData,int length){
+ public static UDTPacket createPacket(byte[]encodedData,int length)throws IOException{
boolean isControl=(encodedData[0]&128) !=0 ;
if(isControl)return createControlPacket(encodedData,length);
return new DataPacket(encodedData,length);
@@ -59,7 +61,7 @@
* @param packetData
* @return
*/
- public static ControlPacket createControlPacket(byte[]encodedData,int length){
+ public static ControlPacket createControlPacket(byte[]encodedData,int length)throws IOException{
ControlPacket packet=null;
Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -32,7 +32,10 @@
package udt.packets;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
public class PacketUtil {
public static byte[]encode(long value){
@@ -80,4 +83,30 @@
return result;
}
+ /**
+ * encodes the specified address into 128 bit
+ * @param address - inet address
+ */
+ public static byte[] encode(InetAddress address){
+ byte[]res=new byte[16];
+ byte[]add=address.getAddress();
+ System.arraycopy(add, 0, res, 0, add.length);
+ return res;
+ }
+
+ public static InetAddress decodeInetAddress(byte[]data, int start, boolean ipV6)throws UnknownHostException{
+ InetAddress result=null;
+ byte[] add=ipV6?new byte[16]:new byte[4];
+ System.arraycopy(data, start, add, 0, add.length);
+ result=InetAddress.getByAddress(add);
+ return result;
+ }
+
+ public static void print(byte[]arr){
+ System.out.print("[");
+ for(byte b: arr){
+ System.out.print(" "+(b&0xFF));
+ }
+ System.out.println(" ]");
+ }
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -54,7 +54,9 @@
try{
long seq=data.getSequenceNumber();
//if already have this chunk, discard it
- if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true;
+ if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0){
+ return true;
+ }
//else compute insert position
int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq);
int insert=offset% size;
Modified: udt-java/trunk/src/main/java/udt/util/SequenceNumber.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -13,7 +13,7 @@
private final static long maxSequenceNo=0x7FFFFFFF;
-
+ private final static Random rand=new Random();
/**
* compare seq1 and seq2. Returns zero, if they are equal, a negative value if seq1 is smaller than
* seq2, and a positive value if seq1 is larger than seq2.
@@ -67,7 +67,7 @@
* generates a random number between 1 and 0x3FFFFFFF (inclusive)
*/
public static long random(){
- return 1+new Random().nextInt(maxOffset);
+ return 1+rand.nextInt(maxOffset);
}
}
Modified: udt-java/trunk/src/test/java/echo/EchoServer.java
===================================================================
--- udt-java/trunk/src/test/java/echo/EchoServer.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/test/java/echo/EchoServer.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -78,7 +78,7 @@
String line=readLine(in);
if(line!=null){
System.out.println("ECHO: "+line);
- //else echo back the line
+ //echo back the line
writer.println(line);
writer.flush();
}
Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -101,7 +101,6 @@
try{
for(int i=0;i<blocks.length;i++){
while(!is.haveNewData(i+1, blocks[i])){
- Thread.yield();
Thread.sleep(100);
}
}
Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java
===================================================================
--- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -3,17 +3,20 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import udt.UDTPacket;
+import udt.util.SequenceNumber;
public class TestPacketFactory {
@Test
- public void testData(){
+ public void testData()throws IOException{
String test="sdjfsdjfldskjflds";
byte[]data=test.getBytes();
@@ -26,7 +29,7 @@
}
@Test
- public void testConnectionHandshake(){
+ public void testConnectionHandshake()throws IOException{
ConnectionHandshake p1 = new ConnectionHandshake();
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
@@ -39,8 +42,9 @@
p1.setMaxFlowWndSize(128);
p1.setSocketID(1);
p1.setUdtVersion(4);
-
-
+ p1.setAddress(InetAddress.getLocalHost());
+ p1.setCookie(SequenceNumber.random());
+
byte[]p1_data=p1.getEncoded();
UDTPacket p=PacketFactory.createPacket(p1_data);
@@ -50,7 +54,7 @@
}
@Test
- public void testAcknowledgement(){
+ public void testAcknowledgement()throws IOException{
Acknowledgement p1 = new Acknowledgement();
p1.setAckSequenceNumber(1234);
p1.setMessageNumber(9876);
@@ -70,7 +74,7 @@
}
@Test
- public void testAcknowledgementOfAcknowledgement(){
+ public void testAcknowledgementOfAcknowledgement()throws IOException{
Acknowledgment2 p1 = new Acknowledgment2();
p1.setAckSequenceNumber(1230);
p1.setMessageNumber(9871);
@@ -86,7 +90,7 @@
}
@Test
- public void testNegativeAcknowledgement(){
+ public void testNegativeAcknowledgement()throws IOException{
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
@@ -105,7 +109,7 @@
}
@Test
- public void testNegativeAcknowledgement2(){
+ public void testNegativeAcknowledgement2()throws IOException{
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
@@ -130,7 +134,7 @@
}
@Test
- public void testNegativeAcknowledgement3(){
+ public void testNegativeAcknowledgement3()throws IOException{
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
@@ -148,13 +152,12 @@
}
@Test
- public void testShutdown(){
+ public void testShutdown()throws IOException{
Shutdown p1 = new Shutdown();
p1.setMessageNumber(9874);
p1.setTimeStamp(3453);
p1.setDestinationID(3);
-
byte[]p1_data=p1.getEncoded();
UDTPacket p=PacketFactory.createPacket(p1_data);
@@ -164,7 +167,7 @@
@Test
- public void testMessageDropRequest(){
+ public void testMessageDropRequest()throws Exception{
MessageDropRequest p1=new MessageDropRequest();
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
@@ -181,5 +184,15 @@
MessageDropRequest p2=(MessageDropRequest)p;
assertEquals(p1,p2);
}
+
+ @Test
+ public void testPacketUtil()throws Exception{
+ InetAddress i=InetAddress.getLocalHost();
+ byte[]enc=PacketUtil.encode(i);
+ PacketUtil.print(enc);
+ InetAddress i2=PacketUtil.decodeInetAddress(enc, 0, false);
+ System.out.println(i2);
+ assertEquals(i, i2);
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2012-03-28 06:34:22
|
Revision: 73
http://udt-java.svn.sourceforge.net/udt-java/?rev=73&view=rev
Author: bschuller
Date: 2012-03-28 06:34:15 +0000 (Wed, 28 Mar 2012)
Log Message:
-----------
avoid socket closed exception
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTReceiver.java
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-02-06 10:06:38 UTC (rev 72)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73)
@@ -334,7 +334,7 @@
* process EXP event (see spec. p 13)
*/
protected void processEXPEvent()throws IOException{
- if(session.getSocket()==null)return;
+ if(session.getSocket()==null || !session.getSocket().isActive())return;
UDTSender sender=session.getSocket().getSender();
//put all the unacknowledged packets in the senders loss list
sender.putUnacknowledgedPacketsIntoLossList();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2012-02-06 10:06:48
|
Revision: 72
http://udt-java.svn.sourceforge.net/udt-java/?rev=72&view=rev
Author: bschuller
Date: 2012-02-06 10:06:38 +0000 (Mon, 06 Feb 2012)
Log Message:
-----------
keep track of data array length
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-02-06 08:26:01 UTC (rev 71)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-02-06 10:06:38 UTC (rev 72)
@@ -200,7 +200,11 @@
throughput.end();
throughput.begin();
}
- sendBuffer.put(p.getPacketSequenceNumber(), p.getData());
+ //store data for potential retransmit
+ int l=p.getLength();
+ byte[]data=new byte[l];
+ System.arraycopy(p.getData(), 0, data, 0, l);
+ sendBuffer.put(p.getPacketSequenceNumber(), data);
unacknowledged.incrementAndGet();
}
statistics.incNumberOfSentDataPackets();
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2012-02-06 08:26:01 UTC (rev 71)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2012-02-06 10:06:38 UTC (rev 72)
@@ -78,7 +78,7 @@
return this.data;
}
- public double getLength(){
+ public int getLength(){
return dataLength;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2012-02-06 08:26:10
|
Revision: 71
http://udt-java.svn.sourceforge.net/udt-java/?rev=71&view=rev
Author: bschuller
Date: 2012-02-06 08:26:01 +0000 (Mon, 06 Feb 2012)
Log Message:
-----------
use system-specific line terminator char
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/util/Util.java
Modified: udt-java/trunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Util.java 2012-02-03 09:09:37 UTC (rev 70)
+++ udt-java/trunk/src/main/java/udt/util/Util.java 2012-02-06 08:26:01 UTC (rev 71)
@@ -86,7 +86,8 @@
* @throws IOException
*/
public static String readLine(InputStream input)throws IOException{
- return readLine(input, '\n');
+ char term=System.getProperty("line.separator").charAt(0);
+ return readLine(input, term);
}
/**
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2012-02-03 09:09:43
|
Revision: 70
http://udt-java.svn.sourceforge.net/udt-java/?rev=70&view=rev
Author: bschuller
Date: 2012-02-03 09:09:37 +0000 (Fri, 03 Feb 2012)
Log Message:
-----------
roll back change to FlowWindow; add some comments to make it easier to understand
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
Modified: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -6,10 +6,12 @@
/**
*
- * holds a fixed number of {@link DataPacket} instances which are sent out.
+ * Holds a fixed number of {@link DataPacket} instances which are sent out.<br/>
*
- * it is assumed that a single thread stores new data, and another single thread
- * reads/removes data
+ * it is assumed that a single thread (the producer) stores new data,
+ * and another single thread (the consumer) reads/removes data.<br/>
+ *
+ *
*
* @author schuller
*/
@@ -23,12 +25,15 @@
private volatile boolean isFull=false;
+ //valid entries that can be read
private volatile int validEntries=0;
private volatile boolean isCheckout=false;
+ //index where the next data packet will be written to
private volatile int writePos=0;
+ //one before the index where the next data packet will be read from
private volatile int readPos=-1;
private volatile int consumed=0;
@@ -42,7 +47,7 @@
* @param chunksize - data chunk size
*/
public FlowWindow(int size, int chunksize){
- this.length=size;
+ this.length=size+1;
packets=new DataPacket[length];
for(int i=0;i<packets.length;i++){
packets[i]=new DataPacket();
@@ -64,21 +69,25 @@
}
if(isCheckout)throw new IllegalStateException();
isCheckout=true;
- DataPacket p=packets[writePos];
- return p;
+ return packets[writePos];
}finally{
lock.unlock();
}
}
+ /**
+ * notify the flow window that the data packet obtained by {@link #getForProducer()}
+ * has been filled with data and is ready for sending out
+ */
public void produce(){
lock.lock();
try{
+ if(!isCheckout)throw new IllegalStateException();
isCheckout=false;
writePos++;
if(writePos==length)writePos=0;
validEntries++;
- isFull=validEntries==length;
+ isFull=validEntries==length-1;
isEmpty=false;
produced++;
}finally{
@@ -88,11 +97,11 @@
public DataPacket consumeData(){
- if(isEmpty){
- return null;
- }
lock.lock();
try{
+ if(isEmpty){
+ return null;
+ }
readPos++;
DataPacket p=packets[readPos];
if(readPos==length-1)readPos=-1;
@@ -133,6 +142,7 @@
StringBuilder sb=new StringBuilder();
sb.append("FlowWindow size=").append(length);
sb.append(" full=").append(isFull).append(" empty=").append(isEmpty);
+ sb.append(" readPos=").append(readPos).append(" writePos=").append(writePos);
sb.append(" consumed=").append(consumed).append(" produced=").append(produced);
return sb.toString();
}
Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -25,9 +25,9 @@
@Test
public void testWithoutLoss()throws Exception{
- Logger.getLogger("udt").setLevel(Level.WARNING);
+ Logger.getLogger("udt").setLevel(Level.INFO);
UDTReceiver.dropRate=0;
- num_packets=640;
+ num_packets=1000;
TIMEOUT=Integer.MAX_VALUE;
doTest();
}
@@ -37,9 +37,9 @@
public void testWithLoss()throws Exception{
UDTReceiver.dropRate=3;
TIMEOUT=Integer.MAX_VALUE;
- num_packets=512;
+ num_packets=100;
//set log level
- Logger.getLogger("udt").setLevel(Level.WARNING);
+ Logger.getLogger("udt").setLevel(Level.INFO);
doTest();
}
@@ -48,9 +48,9 @@
public void testLargeDataSet()throws Exception{
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- num_packets=3*1024;
+ num_packets=100;
//set log level
- Logger.getLogger("udt").setLevel(Level.WARNING);
+ Logger.getLogger("udt").setLevel(Level.INFO);
doTest();
}
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -6,6 +6,7 @@
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -18,7 +19,7 @@
*/
public class UDPTest {
- final int num_packets=10*10*1000;
+ final int num_packets=1000;
final int packetSize=UDPEndPoint.DATAGRAM_SIZE;
@Test
@@ -48,6 +49,7 @@
dgSendInterval.end();
dgSendTime.begin();
s.send(dp);
+ Thread.sleep(5);
dgSendTime.end();
dgSendInterval.begin();
}
@@ -76,11 +78,10 @@
public void run(){
try{
byte[]buf=new byte[packetSize];
- DatagramPacket dp=new DatagramPacket(buf,buf.length);
while(true){
+ DatagramPacket dp=new DatagramPacket(buf,buf.length);
serverSocket.receive(dp);
handoff.offer(dp);
- total+=dp.getLength();
}
}
catch(Exception e){
@@ -91,6 +92,7 @@
};
Thread t=new Thread(serverProcess);
t.start();
+ System.out.println("Server started.");
}
private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>();
@@ -99,11 +101,15 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
+ int counter=0;
long start=System.currentTimeMillis();
- while(true){
- DatagramPacket dp=handoff.poll();
- if(dp!=null)total+=dp.getLength();
- if(total==N)break;
+ while(counter<num_packets){
+ DatagramPacket dp=handoff.poll(10, TimeUnit.MILLISECONDS);
+ if(dp!=null){
+ total+=dp.getLength();
+ counter++;
+ System.out.println("Count: "+counter);
+ }
}
long end=System.currentTimeMillis();
System.out.println("Server time: "+(end-start)+" ms.");
@@ -117,6 +123,7 @@
};
Thread t=new Thread(serverProcess);
t.start();
+ System.out.println("Hand-off thread started.");
}
Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
===================================================================
--- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69)
+++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 09:09:37 UTC (rev 70)
@@ -33,6 +33,7 @@
DataPacket no=fw.getForProducer();
assertNull("Window should be full",no);
+ assertTrue(fw.isFull());
DataPacket c1=fw.consumeData();
//must be p1
@@ -73,14 +74,21 @@
DataPacket p4=fw.getForProducer();
assertNotNull(p4);
fw.produce();
+ fw.consumeData();
+
+ DataPacket p5=fw.getForProducer();
+ assertNotNull(p5);
+ fw.produce();
+
//which is again p1
- assertTrue(p4==p1);
+ assertTrue(p5==p1);
}
private volatile boolean fail=false;
- public void testConcurrentReadWrite()throws InterruptedException{
+ @Test
+ public void testConcurrentReadWrite_20()throws InterruptedException{
final FlowWindow fw=new FlowWindow(20, 64);
Thread reader=new Thread(new Runnable(){
public void run(){
@@ -107,6 +115,34 @@
}
+ @Test
+ public void testConcurrentReadWrite_2()throws InterruptedException{
+ final FlowWindow fw=new FlowWindow(2, 64);
+ Thread reader=new Thread(new Runnable(){
+ public void run(){
+ doRead(fw);
+ }
+ });
+ reader.setName("reader");
+ Thread writer=new Thread(new Runnable(){
+ public void run(){
+ doWrite(fw);
+ }
+ });
+ writer.setName("writer");
+
+ writer.start();
+ reader.start();
+
+ int c=0;
+ while(read && write && c<10){
+ Thread.sleep(1000);
+ c++;
+ }
+ assertFalse("An error occured in reader or writer",fail);
+
+ }
+
volatile boolean read=true;
volatile boolean write=true;
int N=100000;
@@ -123,6 +159,7 @@
}
}catch(Throwable ex){
ex.printStackTrace();
+ System.out.println(fw);
fail=true;
}
System.out.println("Exiting reader...");
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2012-02-03 06:57:07
|
Revision: 69
http://udt-java.svn.sourceforge.net/udt-java/?rev=69&view=rev
Author: bschuller
Date: 2012-02-03 06:57:00 +0000 (Fri, 03 Feb 2012)
Log Message:
-----------
use Junit4
Modified Paths:
--------------
udt-java/trunk/LICENSE
udt-java/trunk/pom.xml
udt-java/trunk/src/test/java/echo/TestEchoServer.java
udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java
udt-java/trunk/src/test/java/udt/TestList.java
udt-java/trunk/src/test/java/udt/TestReceiverLossList.java
udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java
udt-java/trunk/src/test/java/udt/UDTTestBase.java
udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java
udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java
udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java
udt-java/trunk/src/test/java/udt/performance/TCPTest.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
Modified: udt-java/trunk/LICENSE
===================================================================
--- udt-java/trunk/LICENSE 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/LICENSE 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,5 +1,5 @@
/*********************************************************************************
- * Copyright (c) 2010 Forschungszentrum Juelich GmbH
+ * Copyright (c) 2010-2012 Forschungszentrum Juelich GmbH
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Modified: udt-java/trunk/pom.xml
===================================================================
--- udt-java/trunk/pom.xml 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/pom.xml 2012-02-03 06:57:00 UTC (rev 69)
@@ -27,7 +27,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.1</version>
+ <version>4.8.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Modified: udt-java/trunk/src/test/java/echo/TestEchoServer.java
===================================================================
--- udt-java/trunk/src/test/java/echo/TestEchoServer.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/echo/TestEchoServer.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -4,12 +4,16 @@
import java.io.PrintWriter;
import java.net.InetAddress;
-import junit.framework.TestCase;
+import junit.framework.Assert;
+
+import org.junit.Test;
+
import udt.UDTClient;
import udt.util.Util;
-public class TestEchoServer extends TestCase {
+public class TestEchoServer {
+ @Test
public void test1()throws Exception{
EchoServer es=new EchoServer(65321);
es.start();
@@ -22,9 +26,9 @@
System.out.println("Message sent.");
client.getInputStream().setBlocking(false);
String line=Util.readLine(client.getInputStream());
- assertNotNull(line);
+ Assert.assertNotNull(line);
System.out.println(line);
- assertEquals("test",line);
+ Assert.assertEquals("test",line);
}
}
Modified: udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java
===================================================================
--- udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -4,12 +4,16 @@
import java.io.PrintWriter;
import java.net.InetAddress;
-import junit.framework.TestCase;
+import junit.framework.Assert;
+
+import org.junit.Test;
+
import udt.UDTClient;
import udt.util.Util;
-public class TestEchoServerMultiClient extends TestCase {
+public class TestEchoServerMultiClient {
+ @Test
public void testTwoClients()throws Exception{
EchoServer es=new EchoServer(65321);
es.start();
@@ -33,8 +37,8 @@
System.out.println("Message sent.");
client.getInputStream().setBlocking(false);
String line=Util.readLine(client.getInputStream());
- assertNotNull(line);
+ Assert.assertNotNull(line);
System.out.println(line);
- assertEquals("test",line);
+ Assert.assertEquals("test",line);
}
}
Modified: udt-java/trunk/src/test/java/udt/TestList.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestList.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/TestList.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,8 +1,13 @@
package udt;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
-import junit.framework.TestCase;
+import org.junit.Test;
+
import udt.packets.DataPacket;
import udt.packets.KeepAlive;
import udt.receiver.AckHistoryEntry;
@@ -15,8 +20,9 @@
/*
* tests for the various list and queue classes
*/
-public class TestList extends TestCase{
+public class TestList {
+ @Test
public void testCircularArray(){
CircularArray<Integer>c=new CircularArray<Integer>(5);
for(int i=0;i<5;i++)c.add(i);
@@ -50,14 +56,14 @@
for(int i=0;i<values.length;i++){
p.add(values[i]);
}
- assertEquals(4.0d, p.computeMedianTimeInterval());
+ assertEquals(4.0d, p.computeMedianTimeInterval(), 0.001d);
long[] arrivaltimes = {12, 12, 12, 12};
PacketPairWindow p1=new PacketPairWindow(16);
for(int i=0;i<values.length;i++){
p1.add(arrivaltimes[i]);
}
- assertEquals(12.0d, p1.computeMedianTimeInterval());
+ assertEquals(12.0d, p1.computeMedianTimeInterval(), 0.001d);
}
Modified: udt-java/trunk/src/test/java/udt/TestReceiverLossList.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestReceiverLossList.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/TestReceiverLossList.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,11 +1,15 @@
package udt;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
import udt.receiver.ReceiverLossList;
import udt.receiver.ReceiverLossListEntry;
-import junit.framework.TestCase;
-public class TestReceiverLossList extends TestCase {
+public class TestReceiverLossList {
+ @Test
public void test1(){
ReceiverLossList l=new ReceiverLossList();
ReceiverLossListEntry e1=new ReceiverLossListEntry(1);
Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,8 +1,12 @@
package udt;
+import static org.junit.Assert.assertEquals;
+
import java.io.File;
import java.io.FileInputStream;
+import org.junit.Test;
+
import udt.util.ReceiveFile;
import udt.util.SendFile;
import udt.util.UDTThreadFactory;
@@ -11,6 +15,7 @@
volatile boolean serverStarted=false;
+ @Test
public void test1()throws Exception{
runServer();
do{
Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -6,10 +6,17 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import junit.framework.Assert;
+
+import org.junit.Test;
+
import udt.util.Util;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestUDTInputStream extends UDTTestBase{
+ @Test
public void test1()throws Exception{
UDTInputStream is=new UDTInputStream(null);
byte[] data1="this is ".getBytes();
@@ -25,6 +32,7 @@
assertEquals(digest,readMD5);
}
+ @Test
public void test2()throws Exception{
UDTInputStream is=new UDTInputStream(null);
byte[] data1=getRandomData(65537);
@@ -40,6 +48,7 @@
assertEquals(digest,readMD5);
}
+ @Test
public void testInOrder()throws Exception{
UDTInputStream is=new UDTInputStream(null);
is.setBlocking(false);
@@ -57,6 +66,7 @@
assertEquals(digest,readMD5);
}
+ @Test
public void testRandomOrder()throws Exception{
UDTInputStream is=new UDTInputStream(null);
is.setBlocking(false);
@@ -76,7 +86,7 @@
}
-
+ @Test
public void testLargeDataSetTwoThreads()throws Exception{
final UDTInputStream is=new UDTInputStream(null);
is.setBlocking(false);
@@ -98,7 +108,7 @@
is.noMoreData();
}catch(Exception e){
e.printStackTrace();
- fail();
+ Assert.fail();
}
}
};
Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,11 +1,16 @@
package udt;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.junit.Test;
+
import udt.util.Util;
public class TestUDTServerSocket extends UDTTestBase{
@@ -18,6 +23,7 @@
int TIMEOUT=20000;
+ @Test
public void testWithoutLoss()throws Exception{
Logger.getLogger("udt").setLevel(Level.WARNING);
UDTReceiver.dropRate=0;
@@ -27,6 +33,7 @@
}
//set an artificial loss rate
+ @Test
public void testWithLoss()throws Exception{
UDTReceiver.dropRate=3;
TIMEOUT=Integer.MAX_VALUE;
@@ -37,6 +44,7 @@
}
//send even more data
+ @Test
public void testLargeDataSet()throws Exception{
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
@@ -97,6 +105,7 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
+ System.out.println("Starting server.");
long start=System.currentTimeMillis();
UDTSocket s=serverSocket.accept();
assertNotNull(s);
@@ -121,7 +130,6 @@
}
catch(Exception e){
e.printStackTrace();
- fail();
serverRunning=false;
}
}
Modified: udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,14 +1,20 @@
package udt;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.junit.Test;
+
import udt.packets.Destination;
public class TestUdpEndpoint extends UDTTestBase{
+ @Test
public void testClientServerMode()throws Exception{
//select log level
@@ -41,6 +47,7 @@
* just check how fast we can send out UDP packets from the endpoint
* @throws Exception
*/
+ @Test
public void testRawSendRate()throws Exception{
Logger.getLogger("udt").setLevel(Level.WARNING);
System.out.println("Checking raw UDP send rate...");
@@ -65,11 +72,12 @@
Thread.sleep(1000);
}
- //no rendezvous yet...
- public void x_testRendezvousConnect()throws Exception{
+ //@Test()
+ public void testRendezvousConnect()throws Exception{
}
+ @Test
public void testBindToAnyPort()throws Exception{
UDPEndPoint ep=new UDPEndPoint(InetAddress.getByName("localhost"));
int port=ep.getLocalPort();
Modified: udt-java/trunk/src/test/java/udt/UDTTestBase.java
===================================================================
--- udt-java/trunk/src/test/java/udt/UDTTestBase.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/UDTTestBase.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -7,12 +7,10 @@
import udt.util.Util;
-import junit.framework.TestCase;
-
/**
* some additional utilities useful for testing
*/
-public abstract class UDTTestBase extends TestCase{
+public abstract class UDTTestBase {
//get an array filled with random data
protected byte[] getRandomData(int size){
Modified: udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java
===================================================================
--- udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,18 +1,22 @@
package udt.packets;
-import udt.packets.ControlPacket;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
import udt.packets.ControlPacket.ControlPacketType;
-import junit.framework.TestCase;
-public class TestControlPacketType extends TestCase {
+public class TestControlPacketType {
+ @Test
public void testSequenceNumber1(){
ControlPacket p=new DummyControlPacket();
byte[]x=p.getHeader();
byte highest=x[0];
assertEquals(128, highest & 0x80);
}
-
+
+ @Test
public void testControlPacketTypes(){
ControlPacketType t=ControlPacketType.CONNECTION_HANDSHAKE;
assertEquals(0,t.ordinal());
Modified: udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java
===================================================================
--- udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,10 +1,11 @@
package udt.packets;
-import junit.framework.TestCase;
-import udt.packets.DataPacket;
+import static org.junit.Assert.assertEquals;
-public class TestDataPacket extends TestCase {
+import org.junit.Test;
+public class TestDataPacket {
+ @Test
public void testSequenceNumber1(){
DataPacket p=new DataPacket();
p.setPacketSequenceNumber(1);
@@ -17,6 +18,7 @@
assertEquals(1, lowest);
}
+ @Test
public void testEncoded(){
DataPacket p=new DataPacket();
p.setPacketSequenceNumber(1);
@@ -30,9 +32,9 @@
System.out.println("String s = " + s);
}
-
+
+ @Test
public void testDecode1(){
-
DataPacket testPacket1=new DataPacket();
testPacket1.setPacketSequenceNumber(127);
testPacket1.setDestinationID(1);
@@ -74,6 +76,7 @@
}
+ @Test
public void testEncodeDecode1(){
DataPacket dp=new DataPacket();
dp.setPacketSequenceNumber(127);
Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java
===================================================================
--- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,24 +1,21 @@
package udt.packets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.util.ArrayList;
import java.util.List;
-import junit.framework.TestCase;
+import org.junit.Test;
+
import udt.UDTPacket;
-import udt.packets.Acknowledgement;
-import udt.packets.Acknowledgment2;
-import udt.packets.ConnectionHandshake;
-import udt.packets.DataPacket;
-import udt.packets.MessageDropRequest;
-import udt.packets.NegativeAcknowledgement;
-import udt.packets.PacketFactory;
-import udt.packets.Shutdown;
-public class TestPacketFactory extends TestCase {
+public class TestPacketFactory {
+ @Test
public void testData(){
String test="sdjfsdjfldskjflds";
-
+
byte[]data=test.getBytes();
data[0]=(byte)(data[0] & 0x7f);
UDTPacket p=PacketFactory.createPacket(data);
@@ -27,14 +24,14 @@
assertTrue(p instanceof DataPacket);
assertEquals(test,t);
}
-
-
+
+ @Test
public void testConnectionHandshake(){
ConnectionHandshake p1 = new ConnectionHandshake();
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
p1.setDestinationID(1);
-
+
p1.setConnectionType(1);
p1.setSocketType(1);
p1.setInitialSeqNo(321);
@@ -42,16 +39,17 @@
p1.setMaxFlowWndSize(128);
p1.setSocketID(1);
p1.setUdtVersion(4);
-
-
+
+
byte[]p1_data=p1.getEncoded();
-
+
UDTPacket p=PacketFactory.createPacket(p1_data);
ConnectionHandshake p2=(ConnectionHandshake)p;
assertEquals(p1,p2);
-
+
}
-
+
+ @Test
public void testAcknowledgement(){
Acknowledgement p1 = new Acknowledgement();
p1.setAckSequenceNumber(1234);
@@ -64,28 +62,30 @@
p1.setPacketReceiveRate(1000);
p1.setRoundTripTime(1000);
p1.setRoundTripTimeVar(500);
-
+
byte[]p1_data=p1.getEncoded();
UDTPacket p=PacketFactory.createPacket(p1_data);
Acknowledgement p2=(Acknowledgement)p;
assertEquals(p1,p2);
}
-
+
+ @Test
public void testAcknowledgementOfAcknowledgement(){
Acknowledgment2 p1 = new Acknowledgment2();
p1.setAckSequenceNumber(1230);
p1.setMessageNumber(9871);
p1.setTimeStamp(3451);
p1.setDestinationID(1);
-
+
byte[]p1_data=p1.getEncoded();
UDTPacket p=PacketFactory.createPacket(p1_data);
Acknowledgment2 p2=(Acknowledgment2)p;
assertEquals(p1,p2);
-
-
+
+
}
-
+
+ @Test
public void testNegativeAcknowledgement(){
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
@@ -95,15 +95,16 @@
p1.addLossInfo(6);
p1.addLossInfo(7, 10);
byte[]p1_data=p1.getEncoded();
-
+
UDTPacket p=PacketFactory.createPacket(p1_data);
NegativeAcknowledgement p2=(NegativeAcknowledgement)p;
assertEquals(p1,p2);
-
+
assertEquals((Integer)5, (Integer)p2.getDecodedLossInfo().get(0));
assertEquals(6, p2.getDecodedLossInfo().size());
}
-
+
+ @Test
public void testNegativeAcknowledgement2(){
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
@@ -116,18 +117,19 @@
loss.add(8l);
loss.add(9l);
loss.add(11l);
-
+
p1.addLossInfo(loss);
byte[]p1_data=p1.getEncoded();
-
+
UDTPacket p=PacketFactory.createPacket(p1_data);
NegativeAcknowledgement p2=(NegativeAcknowledgement)p;
assertEquals(p1,p2);
-
+
assertEquals((Integer)5, (Integer)p2.getDecodedLossInfo().get(0));
assertEquals(6, p2.getDecodedLossInfo().size());
}
+ @Test
public void testNegativeAcknowledgement3(){
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
@@ -137,42 +139,43 @@
p1.addLossInfo(6);
p1.addLossInfo(147, 226);
byte[]p1_data=p1.getEncoded();
-
+
UDTPacket p=PacketFactory.createPacket(p1_data);
NegativeAcknowledgement p2=(NegativeAcknowledgement)p;
assertEquals(p1,p2);
-
-
+
+
}
-
- public void testShutdown(){
+
+ @Test
+ public void testShutdown(){
Shutdown p1 = new Shutdown();
p1.setMessageNumber(9874);
p1.setTimeStamp(3453);
p1.setDestinationID(3);
-
-
+
+
byte[]p1_data=p1.getEncoded();
-
+
UDTPacket p=PacketFactory.createPacket(p1_data);
Shutdown p2=(Shutdown)p;
assertEquals(p1,p2);
}
-
-
-
+
+
+ @Test
public void testMessageDropRequest(){
MessageDropRequest p1=new MessageDropRequest();
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
p1.setDestinationID(4);
-
+
p1.setMsgFirstSeqNo(2);
p1.setMsgLastSeqNo(3);
-
-
+
+
byte[]p1_data=p1.getEncoded();
-
+
UDTPacket p=PacketFactory.createPacket(p1_data);
assertTrue(p instanceof MessageDropRequest);
MessageDropRequest p2=(MessageDropRequest)p;
Modified: udt-java/trunk/src/test/java/udt/performance/TCPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TCPTest.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/performance/TCPTest.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -6,17 +6,18 @@
import java.net.Socket;
import java.util.Random;
-import junit.framework.TestCase;
+import org.junit.Test;
/**
* send some data over a TCP connection and measure performance
*
*/
-public class TCPTest extends TestCase {
+public class TCPTest {
int BUFSIZE=1024;
int num_packets=10*1000;
+ @Test
public void test1()throws Exception{
runServer();
//client socket
@@ -59,7 +60,7 @@
}
catch(Exception e){
e.printStackTrace();
- fail();
+ serverRunning=false;
}
}
};
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,5 +1,7 @@
package udt.performance;
+import static org.junit.Assert.*;
+
import java.io.File;
import java.net.InetAddress;
import java.security.MessageDigest;
@@ -9,6 +11,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.junit.Test;
+
import udt.UDTClient;
import udt.UDTInputStream;
import udt.UDTReceiver;
@@ -31,6 +35,7 @@
int READ_BUFFERSIZE=1*1024*1024;
+ @Test
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
// System.setProperty("udt.receiver.storeStatistics","true");
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -3,6 +3,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.junit.Test;
+
import udt.UDTReceiver;
import udt.UDTSession;
import udt.cc.SimpleTCP;
@@ -22,6 +24,8 @@
int READ_BUFFERSIZE=1*1024*1024;
+ @Override
+ @Test
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
UDTReceiver.dropRate=0;
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -7,7 +7,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
-import junit.framework.TestCase;
+import org.junit.Test;
+
import udt.UDPEndPoint;
import udt.packets.DataPacket;
import udt.util.MeanValue;
@@ -15,11 +16,12 @@
/**
* send some data over a UDP connection and measure performance
*/
-public class UDPTest extends TestCase {
+public class UDPTest {
final int num_packets=10*10*1000;
final int packetSize=UDPEndPoint.DATAGRAM_SIZE;
+ @Test
public void test1()throws Exception{
runServer();
runThirdThread();
Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
===================================================================
--- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 19:52:51 UTC (rev 68)
+++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69)
@@ -1,12 +1,20 @@
package udt.sender;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.util.concurrent.TimeoutException;
-import junit.framework.TestCase;
+import org.junit.Test;
+
import udt.packets.DataPacket;
-public class TestFlowWindow extends TestCase {
+public class TestFlowWindow {
+ @Test
public void testFillWindow()throws InterruptedException, TimeoutException{
FlowWindow fw=new FlowWindow(3, 128);
DataPacket p1=fw.getForProducer();
@@ -38,6 +46,7 @@
assertTrue(fw.isEmpty());
}
+ @Test
public void testOverflow()throws InterruptedException, TimeoutException{
FlowWindow fw=new FlowWindow(3, 64);
DataPacket p1=fw.getForProducer();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-12-01 19:52:58
|
Revision: 68
http://udt-java.svn.sourceforge.net/udt-java/?rev=68&view=rev
Author: bschuller
Date: 2011-12-01 19:52:51 +0000 (Thu, 01 Dec 2011)
Log Message:
-----------
apply a few fixes from P. Elgee
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
Modified: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-12-01 13:55:08 UTC (rev 67)
+++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-12-01 19:52:51 UTC (rev 68)
@@ -78,7 +78,7 @@
writePos++;
if(writePos==length)writePos=0;
validEntries++;
- isFull=validEntries==length-1;
+ isFull=validEntries==length;
isEmpty=false;
produced++;
}finally{
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-12-01 13:55:08 UTC (rev 67)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-12-01 19:52:51 UTC (rev 68)
@@ -120,7 +120,6 @@
}
else return null;
}
- numValidChunks.decrementAndGet();
return r;
}
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 13:55:08 UTC (rev 67)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 19:52:51 UTC (rev 68)
@@ -22,7 +22,7 @@
boolean running=false;
//how many
- int num_packets=500;
+ int num_packets=50;
//how large is a single packet
int size=20*1024*1024;
Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
===================================================================
--- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 13:55:08 UTC (rev 67)
+++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 19:52:51 UTC (rev 68)
@@ -110,9 +110,7 @@
while( (p=fw.consumeData())==null){
Thread.sleep(1);
}
- synchronized (p) {
- assertEquals(i,p.getMessageNumber());
- }
+ assertEquals(i,p.getMessageNumber());
}
}catch(Throwable ex){
ex.printStackTrace();
@@ -131,11 +129,9 @@
do{
p=fw.getForProducer();
if(p!=null){
- synchronized(p){
- p.setData(("test"+i).getBytes());
- p.setMessageNumber(i);
- fw.produce();
- }
+ p.setData(("test"+i).getBytes());
+ p.setMessageNumber(i);
+ fw.produce();
}
}while(p==null);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-12-01 13:55:19
|
Revision: 67
http://udt-java.svn.sourceforge.net/udt-java/?rev=67&view=rev
Author: bschuller
Date: 2011-12-01 13:55:08 +0000 (Thu, 01 Dec 2011)
Log Message:
-----------
fix test failure
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-11-17 07:26:46 UTC (rev 66)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-12-01 13:55:08 UTC (rev 67)
@@ -61,7 +61,6 @@
public DataPacket(byte[] encodedData, int length){
decode(encodedData,length);
- dataLength=length;
}
void decode(byte[]encodedData,int length){
@@ -69,8 +68,9 @@
messageNumber=PacketUtil.decode(encodedData, 4);
timeStamp=PacketUtil.decode(encodedData, 8);
destinationID=PacketUtil.decode(encodedData, 12);
- data=new byte[length-16];
- System.arraycopy(encodedData, 16, data, 0, data.length);
+ dataLength=length-16;
+ data=new byte[dataLength];
+ System.arraycopy(encodedData, 16, data, 0, dataLength);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-11-17 07:26:52
|
Revision: 66
http://udt-java.svn.sourceforge.net/udt-java/?rev=66&view=rev
Author: bschuller
Date: 2011-11-17 07:26:46 +0000 (Thu, 17 Nov 2011)
Log Message:
-----------
fix single byte read()
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTInputStream.java
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2011-10-16 11:41:23 UTC (rev 65)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2011-11-17 07:26:46 UTC (rev 66)
@@ -82,7 +82,7 @@
b=read(single);
if(b>0){
- return single[0];
+ return single[0] & 0xFF;
}
else {
return b;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-10-16 11:41:29
|
Revision: 65
http://udt-java.svn.sourceforge.net/udt-java/?rev=65&view=rev
Author: bschuller
Date: 2011-10-16 11:41:23 +0000 (Sun, 16 Oct 2011)
Log Message:
-----------
remove Thread.sleep call, and reduce waiting interval while waiting for session to become ready
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTClient.java
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-16 17:56:07 UTC (rev 64)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-10-16 11:41:23 UTC (rev 65)
@@ -85,10 +85,9 @@
clientSession.connect();
//wait for handshake
while(!clientSession.isReady()){
- Thread.sleep(500);
+ Thread.sleep(5);
}
logger.info("The UDTClient is connected");
- Thread.sleep(500);
}
/**
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-08-16 17:56:14
|
Revision: 64
http://udt-java.svn.sourceforge.net/udt-java/?rev=64&view=rev
Author: bschuller
Date: 2011-08-16 17:56:07 +0000 (Tue, 16 Aug 2011)
Log Message:
-----------
fix two bugs: thanks to ajsenf (Alexander Senf)
see https://sourceforge.net/projects/udt-java/forums/forum/1109269/topic/4615162?message=10597365
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -36,6 +36,7 @@
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -115,15 +116,15 @@
}
/**
- * flush outstanding data (and make sure it is acknowledged)
+ * flush outstanding data, with the specified maximum waiting time
+ * @param timeOut - timeout in millis (if smaller than 0, no timeout is used)
* @throws IOException
* @throws InterruptedException
*/
- public void flush()throws IOException, InterruptedException{
+ public void flush()throws IOException, InterruptedException, TimeoutException{
clientSession.getSocket().flush();
}
-
public void shutdown()throws IOException{
if (clientSession.isReady()&& clientSession.active==true)
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -39,7 +39,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -110,11 +111,9 @@
private volatile CountDownLatch startLatch=new CountDownLatch(1);
//used by the sender to wait for an ACK
- private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+ private final ReentrantLock ackLock=new ReentrantLock();
+ private final Condition ackCondition=ackLock.newCondition();
- //used by the sender to wait for an ACK of a certain sequence number
- private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
-
private final boolean storeStatistics;
private final int chunksize;
@@ -130,8 +129,6 @@
flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize);
lastAckSequenceNumber=session.getInitialSequenceNumber();
currentSequenceNumber=session.getInitialSequenceNumber()-1;
- waitForAckLatch.set(new CountDownLatch(1));
- waitForSeqAckLatch.set(new CountDownLatch(1));
storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics");
initMetrics();
doStart();
@@ -278,8 +275,9 @@
}
protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
- waitForAckLatch.get().countDown();
- waitForSeqAckLatch.get().countDown();
+ ackLock.lock();
+ ackCondition.signal();
+ ackLock.unlock();
CongestionControl cc=session.getCongestionControl();
long rtt=acknowledgement.getRoundTripTime();
@@ -407,6 +405,8 @@
}
}
+ private final DataPacket retransmit=new DataPacket();
+
/**
* re-transmit an entry from the sender loss list
* @param entry
@@ -416,13 +416,11 @@
//retransmit the packet and remove it from the list
byte[]data=sendBuffer.get(seqNumber);
if(data!=null){
- //System.out.println("re-transmit "+data);
- DataPacket packet=new DataPacket();
- packet.setPacketSequenceNumber(seqNumber);
- packet.setSession(session);
- packet.setDestinationID(session.getDestination().getSocketID());
- packet.setData(data);
- endpoint.doSend(packet);
+ retransmit.setPacketSequenceNumber(seqNumber);
+ retransmit.setSession(session);
+ retransmit.setDestinationID(session.getDestination().getSocketID());
+ retransmit.setData(data);
+ endpoint.doSend(retransmit);
statistics.incNumberOfRetransmittedDataPackets();
}
}catch (Exception e) {
@@ -486,18 +484,37 @@
*/
public void waitForAck(long sequenceNumber)throws InterruptedException{
while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
- waitForSeqAckLatch.set(new CountDownLatch(1));
- waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS);
+ ackLock.lock();
+ try{
+ ackCondition.await(100, TimeUnit.MICROSECONDS);
+ }finally{
+ ackLock.unlock();
+ }
}
}
+ public void waitForAck(long sequenceNumber, int timeout)throws InterruptedException{
+ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){
+ ackLock.lock();
+ try{
+ ackCondition.await(timeout, TimeUnit.MILLISECONDS);
+ }finally{
+ ackLock.unlock();
+ }
+ }
+ }
+
/**
* wait for the next acknowledge
* @throws InterruptedException
*/
public void waitForAck()throws InterruptedException{
- waitForAckLatch.set(new CountDownLatch(1));
- waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS);
+ ackLock.lock();
+ try{
+ ackCondition.await(200, TimeUnit.MICROSECONDS);
+ }finally{
+ ackLock.unlock();
+ }
}
Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -61,16 +61,16 @@
}
void decode(byte[]data){
+ ackSequenceNumber=PacketUtil.decode(data, 0);
}
public boolean forSender(){
return false;
}
- private static final byte[]empty=new byte[0];
@Override
public byte[] encodeControlInformation(){
- return empty;
+ return PacketUtil.encode(ackSequenceNumber);
}
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -54,7 +54,7 @@
try{
long seq=data.getSequenceNumber();
//if already have this chunk, discard it
- if(SequenceNumber.compare(seq, initialSequenceNumber)<0)return true;
+ if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true;
//else compute insert position
int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq);
int insert=offset% size;
@@ -120,6 +120,7 @@
}
else return null;
}
+ numValidChunks.decrementAndGet();
return r;
}
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-13 00:24:18 UTC (rev 63)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-16 17:56:07 UTC (rev 64)
@@ -5,6 +5,7 @@
import java.security.MessageDigest;
import java.text.NumberFormat;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -21,10 +22,10 @@
boolean running=false;
//how many
- int num_packets=100;
+ int num_packets=500;
//how large is a single packet
- int size=1*1024*1024;
+ int size=20*1024*1024;
int TIMEOUT=Integer.MAX_VALUE;
@@ -36,7 +37,12 @@
// System.setProperty("udt.sender.storeStatistics","true");
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
- doTest();
+ try{
+ doTest();
+ }catch(TimeoutException te){
+ te.printStackTrace();
+ fail();
+ }
}
private final NumberFormat format=NumberFormat.getNumberInstance();
@@ -59,6 +65,7 @@
long start=System.currentTimeMillis();
System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each");
long end=0;
+
if(serverRunning){
for(int i=0;i<num_packets;i++){
long block=System.currentTimeMillis();
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tho...@us...> - 2011-08-13 00:24:24
|
Revision: 63
http://udt-java.svn.sourceforge.net/udt-java/?rev=63&view=rev
Author: thomasowens
Date: 2011-08-13 00:24:18 +0000 (Sat, 13 Aug 2011)
Log Message:
-----------
Added a dependency on log4j to the POM. It's not used yet, but I hope to use it in the tests, if not also in the main code.
Modified Paths:
--------------
udt-java/skunk/pom.xml
Modified: udt-java/skunk/pom.xml
===================================================================
--- udt-java/skunk/pom.xml 2011-08-13 00:23:21 UTC (rev 62)
+++ udt-java/skunk/pom.xml 2011-08-13 00:24:18 UTC (rev 63)
@@ -3,7 +3,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>udt-java</groupId>
<artifactId>udt-java</artifactId>
- <packaging>jar</packaging>
<name>UDT Java implementation</name>
<version>0.6-SNAPSHOT</version>
<url>http://sourceforge.net/projects/udt-java</url>
@@ -30,6 +29,11 @@
<version>3.8.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.16</version>
+ </dependency>
</dependencies>
<build>
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tho...@us...> - 2011-08-13 00:23:28
|
Revision: 62
http://udt-java.svn.sourceforge.net/udt-java/?rev=62&view=rev
Author: thomasowens
Date: 2011-08-13 00:23:21 +0000 (Sat, 13 Aug 2011)
Log Message:
-----------
Updated tests to reflect that UDPEndpoint was replaced by UDPMultiplexer.
Modified Paths:
--------------
udt-java/skunk/src/test/java/echo/EchoServer.java
udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java
udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java
udt-java/skunk/src/test/java/udt/performance/UDPTest.java
Modified: udt-java/skunk/src/test/java/echo/EchoServer.java
===================================================================
--- udt-java/skunk/src/test/java/echo/EchoServer.java 2011-08-05 06:54:24 UTC (rev 61)
+++ udt-java/skunk/src/test/java/echo/EchoServer.java 2011-08-13 00:23:21 UTC (rev 62)
@@ -11,81 +11,85 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import udt.UDTInputStream;
-import udt.UDTOutputStream;
import udt.UDTServerSocket;
-import udt.UDTSocket;
import udt.util.UDTThreadFactory;
-public class EchoServer implements Runnable{
+public class EchoServer implements Runnable {
- final ExecutorService pool=Executors.newFixedThreadPool(2);
+ final ExecutorService pool = Executors.newFixedThreadPool(2);
final UDTServerSocket server;
final Thread serverThread;
- volatile boolean started=false;
- volatile boolean stopped=false;
+ volatile boolean started = false;
+ volatile boolean stopped = false;
- public EchoServer(int port)throws Exception{
- server=new UDTServerSocket(InetAddress.getByName("localhost"),port);
- serverThread=UDTThreadFactory.get().newThread(this);
+ public EchoServer(int port) throws Exception {
+ server = new UDTServerSocket(InetAddress.getByName("localhost"), port);
+ serverThread = UDTThreadFactory.get().newThread(this);
}
- public void start(){
+ public void start() {
serverThread.start();
}
-
- public void stop(){
- stopped=true;
+
+ public void stop() {
+ stopped = true;
}
- public void run(){
- try{
- started=true;
- while(!stopped){
- final Socket socket=server.accept();
+
+ public void run() {
+ try {
+ started = true;
+ while (!stopped) {
+ final Socket socket = server.accept();
pool.execute(new Request(socket));
}
- }catch(Exception ex){
+ } catch (Exception ex) {
ex.printStackTrace();
}
}
- static String readLine(InputStream r)throws IOException{
- ByteArrayOutputStream bos=new ByteArrayOutputStream();
- while(true){
- int c=r.read();
- if(c<0 && bos.size()==0)return null;
- if(c<0 || c==10)break;
- else bos.write(c);
+ static String readLine(InputStream r) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ while (true) {
+ int c = r.read();
+ if (c < 0 && bos.size() == 0)
+ return null;
+ if (c < 0 || c == 10)
+ break;
+ else
+ bos.write(c);
}
return bos.toString();
}
+ public static class Request implements Runnable {
- public static class Request implements Runnable{
-
final Socket socket;
- public Request(Socket socket){
- this.socket=socket;
+ public Request(Socket socket) {
+ this.socket = socket;
}
- public void run(){
- try{
- System.out.println("Processing request from <"+socket.getRemoteSocketAddress().toString()+">");
- InputStream in=socket.getInputStream();
- OutputStream out=socket.getOutputStream();
- PrintWriter writer=new PrintWriter(new OutputStreamWriter(out));
- String line=readLine(in);
- if(line!=null){
- System.out.println("ECHO: "+line);
- //else echo back the line
+ public void run() {
+ try {
+ System.out.println("Processing request from <"
+ + socket.getRemoteSocketAddress().toString() + ">");
+ InputStream in = socket.getInputStream();
+ OutputStream out = socket.getOutputStream();
+ PrintWriter writer = new PrintWriter(
+ new OutputStreamWriter(out));
+ String line = readLine(in);
+ if (line != null) {
+ System.out.println("ECHO: " + line);
+ // else echo back the line
writer.println(line);
writer.flush();
}
- System.out.println("Request from <"+socket.getRemoteSocketAddress().toString()+"> finished.");
- }catch(Exception ex){
+ System.out.println("Request from <"
+ + socket.getRemoteSocketAddress().toString()
+ + "> finished.");
+ } catch (Exception ex) {
ex.printStackTrace();
}
}
Modified: udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-08-05 06:54:24 UTC (rev 61)
+++ udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-08-13 00:23:21 UTC (rev 62)
@@ -2,13 +2,14 @@
import java.io.File;
import java.io.FileInputStream;
+import java.util.logging.Logger;
import udt.util.ReceiveFile;
import udt.util.SendFile;
import udt.util.UDTThreadFactory;
public class TestSendFileReceiveFile extends UDTTestBase{
-
+
volatile boolean serverStarted=false;
public void test1()throws Exception{
Modified: udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java
===================================================================
--- udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java 2011-08-05 06:54:24 UTC (rev 61)
+++ udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java 2011-08-13 00:23:21 UTC (rev 62)
@@ -7,74 +7,80 @@
import udt.packets.UDTSocketAddress;
-public class TestUdpEndpoint extends UDTTestBase{
+public class TestUdpEndpoint extends UDTTestBase {
- public void testClientServerMode()throws Exception{
+ public void testClientServerMode() throws Exception {
- //select log level
+ // select log level
Logger.getLogger("udt").setLevel(Level.INFO);
-
- UDPEndPoint server= UDPEndPoint.get(InetAddress.getByName("localhost"),65322);
+
+ UDPMultiplexer server = UDPMultiplexer.get(
+ InetAddress.getByName("localhost"), 65322);
server.start();
- UDTClient client=new UDTClient(InetAddress.getByName("localhost"),12346);
+ UDTClient client = new UDTClient(InetAddress.getByName("localhost"),
+ 12346);
client.connect("localhost", 65322);
-
- //test a large message (resulting in multiple data packets)
- int num_packets=100;
- int N=num_packets*1024;
- byte[]data=getRandomData(N);
-
+
+ // test a large message (resulting in multiple data packets)
+ int num_packets = 100;
+ int N = num_packets * 1024;
+ byte[] data = getRandomData(N);
+
client.sendBlocking(data);
Thread.sleep(2000);
System.out.println(client.getStatistics());
- System.out.println(server.getSessions().iterator().next().getStatistics());
- int sent=client.getStatistics().getNumberOfSentDataPackets();
- int received=server.getSessions().iterator().next().getStatistics().getNumberOfReceivedDataPackets();
+ System.out.println(server.getSessions().iterator().next()
+ .getStatistics());
+ int sent = client.getStatistics().getNumberOfSentDataPackets();
+ int received = server.getSessions().iterator().next().getStatistics()
+ .getNumberOfReceivedDataPackets();
assertEquals(sent, received);
-
+
server.stop();
Thread.sleep(2000);
- }
-
-
+ }
+
/**
* just check how fast we can send out UDP packets from the endpoint
+ *
* @throws Exception
*/
- public void testRawSendRate()throws Exception{
+ public void testRawSendRate() throws Exception {
Logger.getLogger("udt").setLevel(Level.WARNING);
System.out.println("Checking raw UDP send rate...");
- InetAddress localhost=InetAddress.getByName("localhost");
- UDPEndPoint endpoint=UDPEndPoint.get(localhost,65322);
+ InetAddress localhost = InetAddress.getByName("localhost");
+ UDPMultiplexer endpoint = UDPMultiplexer.get(localhost, 65322);
endpoint.start();
- int socketID = endpoint.getUniqueSocketID();
- UDTSocketAddress d1=new UDTSocketAddress(localhost,12345,socketID);
- int dataSize=UDTSession.DEFAULT_DATAGRAM_SIZE;
- DatagramPacket p=new DatagramPacket(getRandomData(dataSize),dataSize,d1.getAddress(),d1.getPort());
- int N=100000;
- long start=System.currentTimeMillis();
- //send many packets as fast as we can
- for(int i=0;i<N;i++){
+ int socketID = endpoint.getUniqueSocketID();
+ UDTSocketAddress d1 = new UDTSocketAddress(localhost, 12345, socketID);
+ int dataSize = UDTSession.DEFAULT_DATAGRAM_SIZE;
+ DatagramPacket p = new DatagramPacket(getRandomData(dataSize),
+ dataSize, d1.getAddress(), d1.getPort());
+ int N = 100000;
+ long start = System.currentTimeMillis();
+ // send many packets as fast as we can
+ for (int i = 0; i < N; i++) {
endpoint.sendRaw(p);
}
- long end=System.currentTimeMillis();
- float rate=1000*N/(end-start);
- System.out.println("PacketRate: "+(int)rate+" packets/sec.");
- float dataRate=dataSize*rate/1024/1024;
- System.out.println("Data Rate: "+(int)dataRate+" MBytes/sec.");
+ long end = System.currentTimeMillis();
+ float rate = 1000 * N / (end - start);
+ System.out.println("PacketRate: " + (int) rate + " packets/sec.");
+ float dataRate = dataSize * rate / 1024 / 1024;
+ System.out.println("Data Rate: " + (int) dataRate + " MBytes/sec.");
endpoint.stop();
Thread.sleep(1000);
}
-
- //no rendezvous yet...
- public void x_testRendezvousConnect()throws Exception{
-
+
+ // no rendezvous yet...
+ public void x_testRendezvousConnect() throws Exception {
+
}
-
- public void testBindToAnyPort()throws Exception{
- UDPEndPoint ep=UDPEndPoint.get(InetAddress.getByName("localhost"),0);
- int port=ep.getLocalPort();
- assertTrue(port>0);
+
+ public void testBindToAnyPort() throws Exception {
+ UDPMultiplexer ep = UDPMultiplexer.get(
+ InetAddress.getByName("localhost"), 0);
+ int port = ep.getLocalPort();
+ assertTrue(port > 0);
}
-
+
}
Modified: udt-java/skunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/skunk/src/test/java/udt/performance/UDPTest.java 2011-08-05 06:54:24 UTC (rev 61)
+++ udt-java/skunk/src/test/java/udt/performance/UDPTest.java 2011-08-13 00:23:21 UTC (rev 62)
@@ -8,7 +8,7 @@
import java.util.concurrent.SynchronousQueue;
import junit.framework.TestCase;
-import udt.UDPEndPoint;
+import udt.UDPMultiplexer;
import udt.packets.DataPacket;
import udt.util.MeanValue;
@@ -17,30 +17,32 @@
*/
public class UDPTest extends TestCase {
- final int num_packets=10*10*1000;
- final int packetSize=UDPEndPoint.DATAGRAM_SIZE;
+ final int num_packets = 10 * 10 * 1000;
+ final int packetSize = UDPMultiplexer.DATAGRAM_SIZE;
- public void test1()throws Exception{
+ public void test1() throws Exception {
runServer();
runThirdThread();
-
- //client socket
- DatagramSocket s=new DatagramSocket(12345);
-
- //generate a test array with random content
- N=num_packets*packetSize;
- byte[]data=new byte[packetSize];
+
+ // client socket
+ DatagramSocket s = new DatagramSocket(12345);
+
+ // generate a test array with random content
+ N = num_packets * packetSize;
+ byte[] data = new byte[packetSize];
new Random().nextBytes(data);
- long start=System.currentTimeMillis();
- DatagramPacket dp=new DatagramPacket(new byte[packetSize],packetSize);
+ long start = System.currentTimeMillis();
+ DatagramPacket dp = new DatagramPacket(new byte[packetSize], packetSize);
dp.setAddress(InetAddress.getByName("localhost"));
dp.setPort(65321);
- System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes");
- MeanValue dgSendTime=new MeanValue("Datagram send time",false);
- MeanValue dgSendInterval=new MeanValue("Datagram send interval",false);
-
- for(int i=0;i<num_packets;i++){
- DataPacket p=new DataPacket();
+ System.out.println("Sending " + num_packets + " data blocks of <"
+ + packetSize + "> bytes");
+ MeanValue dgSendTime = new MeanValue("Datagram send time", false);
+ MeanValue dgSendInterval = new MeanValue("Datagram send interval",
+ false);
+
+ for (int i = 0; i < num_packets; i++) {
+ DataPacket p = new DataPacket();
p.setData(data);
dp.setData(p.getEncoded());
dgSendInterval.end();
@@ -50,72 +52,76 @@
dgSendInterval.begin();
}
System.out.println("Finished sending.");
- while(serverRunning)Thread.sleep(10);
+ while (serverRunning)
+ Thread.sleep(10);
System.out.println("Server stopped.");
- long end=System.currentTimeMillis();
- System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
- float rate=N/1000/(end-start);
- System.out.println("Rate "+rate+" Mbytes/sec "+(rate*8)+ " Mbit/sec");
- System.out.println("Rate "+num_packets+" packets/sec");
- System.out.println("Mean send time "+dgSendTime.get());
- System.out.println("Mean send interval "+dgSendInterval.get());
- System.out.println("Server received: "+total);
+ long end = System.currentTimeMillis();
+ System.out.println("Done. Sending " + N / 1024 / 1024 + " Mbytes took "
+ + (end - start) + " ms");
+ float rate = N / 1000 / (end - start);
+ System.out.println("Rate " + rate + " Mbytes/sec " + (rate * 8)
+ + " Mbit/sec");
+ System.out.println("Rate " + num_packets + " packets/sec");
+ System.out.println("Mean send time " + dgSendTime.get());
+ System.out.println("Mean send interval " + dgSendInterval.get());
+ System.out.println("Server received: " + total);
}
- int N=0;
- long total=0;
- volatile boolean serverRunning=true;
+ int N = 0;
+ long total = 0;
+ volatile boolean serverRunning = true;
- private void runServer()throws Exception{
- //server socket
- final DatagramSocket serverSocket=new DatagramSocket(65321);
+ private void runServer() throws Exception {
+ // server socket
+ final DatagramSocket serverSocket = new DatagramSocket(65321);
- Runnable serverProcess=new Runnable(){
- public void run(){
- try{
- byte[]buf=new byte[packetSize];
- DatagramPacket dp=new DatagramPacket(buf,buf.length);
- while(true){
+ Runnable serverProcess = new Runnable() {
+ public void run() {
+ try {
+ byte[] buf = new byte[packetSize];
+ DatagramPacket dp = new DatagramPacket(buf, buf.length);
+ while (true) {
serverSocket.receive(dp);
handoff.offer(dp);
- total+=dp.getLength();
+ total += dp.getLength();
}
- }
- catch(Exception e){
+ } catch (Exception e) {
e.printStackTrace();
}
- serverRunning=false;
+ serverRunning = false;
}
};
- Thread t=new Thread(serverProcess);
+ Thread t = new Thread(serverProcess);
t.start();
}
-
- private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>();
-
- private void runThirdThread()throws Exception{
- Runnable serverProcess=new Runnable(){
- public void run(){
- try{
- long start=System.currentTimeMillis();
- while(true){
- DatagramPacket dp=handoff.poll();
- if(dp!=null)total+=dp.getLength();
- if(total==N)break;
+
+ private final BlockingQueue<DatagramPacket> handoff = new SynchronousQueue<DatagramPacket>();
+
+ private void runThirdThread() throws Exception {
+ Runnable serverProcess = new Runnable() {
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ while (true) {
+ DatagramPacket dp = handoff.poll();
+ if (dp != null)
+ total += dp.getLength();
+ if (total == N)
+ break;
}
- long end=System.currentTimeMillis();
- System.out.println("Server time: "+(end-start)+" ms.");
+ long end = System.currentTimeMillis();
+ System.out
+ .println("Server time: " + (end - start) + " ms.");
- }
- catch(Exception e){
+ } catch (Exception e) {
e.printStackTrace();
}
- serverRunning=false;
+ serverRunning = false;
}
};
- Thread t=new Thread(serverProcess);
+ Thread t = new Thread(serverProcess);
t.start();
-
+
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pe...@us...> - 2011-08-05 06:54:32
|
Revision: 61
http://udt-java.svn.sourceforge.net/udt-java/?rev=61&view=rev
Author: pete_
Date: 2011-08-05 06:54:24 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Renamed UDPEndPoint to UDPMultiplexer, this will make the implementation easier to understand for new comers who've read UDTv4: Improvements in Performance and Usability.
Modified Paths:
--------------
udt-java/skunk/src/main/java/udt/ClientSession.java
udt-java/skunk/src/main/java/udt/ServerSession.java
udt-java/skunk/src/main/java/udt/UDTClient.java
udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
udt-java/skunk/src/main/java/udt/UDTReceiver.java
udt-java/skunk/src/main/java/udt/UDTSender.java
udt-java/skunk/src/main/java/udt/UDTServerSocket.java
udt-java/skunk/src/main/java/udt/UDTSession.java
udt-java/skunk/src/main/java/udt/UDTSocket.java
udt-java/skunk/src/main/java/udt/util/Util.java
Added Paths:
-----------
udt-java/skunk/src/main/java/udt/UDPMultiplexer.java
Removed Paths:
-------------
udt-java/skunk/src/main/java/udt/UDPEndPoint.java
Modified: udt-java/skunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -50,9 +50,9 @@
private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
- private UDPEndPoint endPoint;
+ private UDPMultiplexer endPoint;
- public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{
+ public ClientSession(UDPMultiplexer endPoint, UDTSocketAddress dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
logger.info("Created "+toString());
Modified: udt-java/skunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -51,12 +51,12 @@
private static final Logger logger=Logger.getLogger(ServerSession.class.getName());
- private final UDPEndPoint endPoint;
+ private final UDPMultiplexer endPoint;
//last received packet (for testing purposes)
private UDTPacket lastPacket;
- public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
+ public ServerSession(DatagramPacket dp, UDPMultiplexer endPoint)throws SocketException,UnknownHostException{
super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0));
this.endPoint=endPoint;
logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
Deleted: udt-java/skunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -1,540 +0,0 @@
-/*********************************************************************************
- * Copyright (c) 2010 Forschungszentrum Juelich GmbH
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * (1) Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the disclaimer at the end. Redistributions in
- * binary form must reproduce the above copyright notice, this list of
- * conditions and the following disclaimer in the documentation and/or other
- * materials provided with the distribution.
- *
- * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * DISCLAIMER
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *********************************************************************************/
-
-package udt;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.WeakHashMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import udt.packets.ConnectionHandshake;
-import udt.packets.UDTSocketAddress;
-import udt.packets.PacketFactory;
-import udt.util.ObjectPool;
-import udt.util.UDTThreadFactory;
-
-/**
- * the UDPEndpoint takes care of sending and receiving UDP network packets,
- * dispatching them to the correct {@link UDTSession}
- */
-public class UDPEndPoint {
-
- //class fields
- private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
- public static final int DATAGRAM_SIZE=1400;
-
-
- //class methods
- private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints
- = new WeakHashMap<SocketAddress, UDPEndPoint>();
-
- public static UDPEndPoint get(DatagramSocket socket){
- SocketAddress localInetSocketAddress = null;
- UDPEndPoint result = null;
- if ( socket.isBound()){
- SocketAddress sa = socket.getLocalSocketAddress();
- if ( sa instanceof InetSocketAddress ){
- localInetSocketAddress = (InetSocketAddress) sa;
- } else {
- // Must be a special DatagramSocket impl or extended.
- localInetSocketAddress =
- new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
- }
- synchronized (localEndpoints){
- result = localEndpoints.get(localInetSocketAddress);
- }
- }
- if (result != null) return result;
- try {
- result = new UDPEndPoint(socket);
- if (localInetSocketAddress == null){
- // The DatagramSocket was unbound, it should be bound now.
- localInetSocketAddress =
- new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
- }
- } catch (SocketException ex) {
- Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex);
- }
- if (result != null){
- synchronized (localEndpoints){
- UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
- if (exists != null && exists.getSocket().equals(socket)) result = exists;
- // Only cache if a record doesn't already exist.
- else if (exists == null) localEndpoints.put(localInetSocketAddress, result);
- }
- }
- return result; // may be null.
- }
-
-
-
- public static UDPEndPoint get(InetAddress localAddress, int localPort){
- InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort);
- return get(localInetSocketAddress);
- }
-
- public static UDPEndPoint get(SocketAddress localSocketAddress){
- InetSocketAddress localInetSocketAddress = null;
- if (localSocketAddress instanceof InetSocketAddress){
- localInetSocketAddress = (InetSocketAddress) localSocketAddress;
- } else if (localSocketAddress instanceof UDTSocketAddress){
- UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress;
- localInetSocketAddress =
- new InetSocketAddress(udtSA.getAddress(), udtSA.getPort());
- }
- if (localInetSocketAddress == null) return null;
- UDPEndPoint result = null;
- synchronized (localEndpoints){
- result = localEndpoints.get(localInetSocketAddress);
- }
- if (result != null) return result;
- try {
- result = new UDPEndPoint(localInetSocketAddress);
- if (localInetSocketAddress.getPort() == 0 ||
- localInetSocketAddress.getAddress().isAnyLocalAddress()){
- // ephemeral port or wildcard address, bind operation is complete.
- localInetSocketAddress =
- new InetSocketAddress(result.getLocalAddress(), result.getLocalPort());
- }
- } catch (SocketException ex) {
- logger.log(Level.SEVERE, null, ex);
- } catch (UnknownHostException ex) {
- logger.log(Level.SEVERE, null, ex);
- }
- if (result != null){
- synchronized (localEndpoints){
- UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
- if (exists != null) result = exists;
- else localEndpoints.put(localInetSocketAddress, result);
- }
- }
- return result; // may be null.
- }
-
- /**
- * Allows a custom endpoint to be added to the pool.
- * @param endpoint
- */
- public static void put(UDPEndPoint endpoint){
- SocketAddress local = endpoint.getSocket().getLocalSocketAddress();
- synchronized (localEndpoints){
- localEndpoints.put(local, endpoint);
- }
- }
-
- //object fields
- private final int port;
-
- private final DatagramSocket dgSocket;
-
- //active sessions keyed by socket ID
- private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>();
-
- //last received packet
- private UDTPacket lastPacket;
-
- //if the endpoint is configured for a server socket,
- //this queue is used to handoff new UDTSessions to the application
- private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
-
- private final ObjectPool<BlockingQueue<UDTSession>> queuePool
- = new ObjectPool<BlockingQueue<UDTSession>>(20);
-
- private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff
- = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120);
-
- // registered sockets
- private final Set<Integer> registeredSockets = new HashSet<Integer>(120);
- // registered sockets lock
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock readSocketLock = rwl.readLock();
- private final Lock writeSocketLock = rwl.writeLock();
-
-
- private boolean serverSocketMode=false;
-
- //has the endpoint been stopped?
- private volatile boolean stopped=false;
-
- private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
-
-
- /**
- * create an endpoint on the given socket
- *
- * @param socket - a UDP datagram socket
- * @throws SocketException
- */
- protected UDPEndPoint(DatagramSocket socket) throws SocketException{
- this.dgSocket=socket;
- if (!socket.isBound()){
- socket.bind(null);
- }
- port=dgSocket.getLocalPort();
- }
-
- /**
- * bind to any local port on the given host address
- * @param localAddress
- * @throws SocketException
- * @throws UnknownHostException
- */
- private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{
- this(localAddress,0);
- }
-
- /**
- * Bind to the given address and port
- * @param localAddress
- * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
- * @throws SocketException
- * @throws UnknownHostException
- */
- private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
- if(localAddress==null){
- dgSocket=new DatagramSocket(localPort, localAddress);
- }else{
- dgSocket=new DatagramSocket(localPort);
- }
- if(localPort>0)this.port = localPort;
- else port=dgSocket.getLocalPort();
-
- configureSocket();
- }
-
- private UDPEndPoint (InetSocketAddress localSocketAddress)
- throws SocketException, UnknownHostException {
- dgSocket = new DatagramSocket(localSocketAddress);
- port = dgSocket.getLocalPort();
- configureSocket();
- }
-
- protected void configureSocket()throws SocketException{
- //set a time out to avoid blocking in doReceive()
- dgSocket.setSoTimeout(100000);
- //buffer size
- dgSocket.setReceiveBufferSize(128*1024);
- dgSocket.setReuseAddress(false);
- }
-
- /**
- * bind to the default network interface on the machine
- *
- * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
- * @throws SocketException
- * @throws UnknownHostException
- */
- public UDPEndPoint(int localPort)throws SocketException, UnknownHostException{
- this(null,localPort);
- }
-
- /**
- * bind to an ephemeral port on the default network interface on the machine
- *
- * @throws SocketException
- * @throws UnknownHostException
- */
- public UDPEndPoint()throws SocketException, UnknownHostException{
- this(null,0);
- }
-
- /**
- * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>,
- * a new connection can be handed off to an application. The application needs to
- * call #accept() to get the socket
- * @param serverSocketModeEnabled
- */
- public void start(boolean serverSocketModeEnabled){
- serverSocketMode=serverSocketModeEnabled;
- //start receive thread
- Runnable receive=new Runnable(){
- public void run(){
- try{
- doReceive();
- }catch(Exception ex){
- logger.log(Level.WARNING,"",ex);
- }
- }
- };
- Thread t=UDTThreadFactory.get().newThread(receive);
- t.setName("UDPEndpoint-"+t.getName());
- t.setDaemon(true);
- t.start();
- logger.info("UDTEndpoint started.");
- }
-
- public void start(){
- start(false);
- }
-
- public void stop(){
- stopped=true;
- dgSocket.close();
- }
-
- /**
- * Provides assistance to a socket to determine a random socket id,
- * every caller receives a unique value. This value is unique at the
- * time of calling, however it may not be at registration time.
- *
- * This socketID has not been registered, all socket ID's must be
- * registered or connection will fail.
- * @return
- */
- public int getUniqueSocketID(){
- Integer socketID = nextSocketID.getAndIncrement();
- try{
- readSocketLock.lock();
- while (registeredSockets.contains(socketID)){
- socketID = nextSocketID.getAndIncrement();
- }
- return socketID; // should we register it?
- } finally {
- readSocketLock.unlock();
- }
- }
-
- void registerSocketID(int socketID, UDTSocket socket) throws SocketException {
- if (!equals(socket.getEndpoint())) throw new SocketException (
- "Socket doesn't originate for this endpoint: "
- + socket.toString());
- try {
- writeSocketLock.lock();
- if (registeredSockets.contains(socketID)){
- throw new SocketException("Already registered, Socket ID: " +socketID);
- }
- registeredSockets.add(socketID);
- }finally{
- writeSocketLock.unlock();
- }
- }
-
- /**
- * @return the port which this client is bound to
- */
- public int getLocalPort() {
- return this.dgSocket.getLocalPort();
- }
- /**
- * @return Gets the local address to which the socket is bound
- */
- public InetAddress getLocalAddress(){
- return this.dgSocket.getLocalAddress();
- }
-
- DatagramSocket getSocket(){
- return dgSocket;
- }
-
- UDTPacket getLastPacket(){
- return lastPacket;
- }
-
- public void addSession(Integer destinationID,UDTSession session){
- logger.log(Level.INFO, "Storing session <{0}>", destinationID);
- sessions.put(destinationID, session);
- }
-
- public UDTSession getSession(Long destinationID){
- return sessions.get(destinationID);
- }
-
- public Collection<UDTSession> getSessions(){
- return sessions.values();
- }
-
- /**
- * wait the given time for a new connection
- * @param timeout - the time to wait
- * @param unit - the {@link TimeUnit}
- * @param socketID - the socket id.
- * @return a new {@link UDTSession}
- * @throws InterruptedException
- */
- protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{
- //return sessionHandoff.poll(timeout, unit);
- BlockingQueue<UDTSession> session = handoff.get(socketID);
- try {
- if (session == null){
- session = queuePool.get();
- if (session == null) {
- session = new ArrayBlockingQueue<UDTSession>(1);
- }
- BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session);
- if (existed != null){
- session = existed;
- }
- }
- return session.poll(timeout, unit);
- } finally {
- boolean removed = handoff.remove(socketID, session);
- if (removed){
- session.clear();
- queuePool.accept(session);
- }
- }
- }
-
-
- final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE);
-
- /**
- * single receive, run in the receiverThread, see {@link #start()}
- * <ul>
- * <li>Receives UDP packets from the network</li>
- * <li>Converts them to UDT packets</li>
- * <li>dispatches the UDT packets according to their destination ID.</li>
- * </ul>
- * @throws IOException
- */
- private long lastDestID=-1;
- private UDTSession lastSession;
-
- private int n=0;
-
- private final Object lock=new Object();
-
- protected void doReceive()throws IOException{
- while(!stopped){
- try{
- //will block until a packet is received or timeout has expired
- dgSocket.receive(dp);
- UDTSocketAddress peer= null;
- int l=dp.getLength();
- UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
- lastPacket=packet;
- //handle connection handshake
- if(packet.isConnectionHandshake()){
- synchronized(lock){
- Long id=Long.valueOf(packet.getDestinationID());
- UDTSession session=sessions.get(id);
- if(session==null){ // What about DOS?
- session=new ServerSession(dp,this);
- addSession(session.getSocketID(),session);
- //TODO need to check peer to avoid duplicate server session
- if(serverSocketMode){
- logger.fine("Pooling new request.");
-// sessionHandoff.put(session); // blocking method, what about offer?
- BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID());
- if (queue != null){
- boolean success = queue.offer(session);
- if (success){
- logger.fine("Request taken for processing.");
- } else {
- logger.fine("Request discarded, queue full.");
- }
- } else {
- logger.fine("No ServerSocket listening at socketID: "
- + session.getSocketID()
- + "to answer request");
- }
- }
- }
- peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
- peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(),
- ((ConnectionHandshake)packet).getSocketID());
- session.received(packet,peer);
- }
- }
- else{
- //dispatch to existing session
- long dest=packet.getDestinationID();
- UDTSession session;
- if(dest==lastDestID){
- session=lastSession;
- }
- else{
- session=sessions.get(dest);
- lastSession=session;
- lastDestID=dest;
- }
- if(session==null){
- n++;
- if(n%100==1){
- logger.warning("Unknown session <"+dest
- +"> requested from <"+peer+"> packet type "
- +packet.getClass().getName());
- }
- }
- else{
- session.received(packet,peer);
- }
- }
- }catch(SocketException ex){
- logger.log(Level.INFO, "SocketException: "+ex.getMessage());
- }catch(SocketTimeoutException ste){
- //can safely ignore... we will retry until the endpoint is stopped
- }catch(Exception ex){
- logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
- }
- }
- }
-
- protected void doSend(UDTPacket packet)throws IOException{
- byte[]data=packet.getEncoded();
- DatagramPacket dgp = packet.getSession().getDatagram();
- dgp.setData(data);
- dgSocket.send(dgp);
- }
-
- public String toString(){
- return "UDPEndpoint port="+port;
- }
-
- public void sendRaw(DatagramPacket p)throws IOException{
- dgSocket.send(p);
- }
-}
Copied: udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (from rev 60, udt-java/skunk/src/main/java/udt/UDPEndPoint.java)
===================================================================
--- udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (rev 0)
+++ udt-java/skunk/src/main/java/udt/UDPMultiplexer.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -0,0 +1,540 @@
+/*********************************************************************************
+ * Copyright (c) 2010 Forschungszentrum Juelich GmbH
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * (1) Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the disclaimer at the end. Redistributions in
+ * binary form must reproduce the above copyright notice, this list of
+ * conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * DISCLAIMER
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *********************************************************************************/
+
+package udt;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import udt.packets.ConnectionHandshake;
+import udt.packets.UDTSocketAddress;
+import udt.packets.PacketFactory;
+import udt.util.ObjectPool;
+import udt.util.UDTThreadFactory;
+
+/**
+ * the UDPMultiplexer takes care of sending and receiving UDP network packets,
+ * dispatching them to the correct {@link UDTSession}
+ */
+public class UDPMultiplexer {
+
+ //class fields
+ private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
+ public static final int DATAGRAM_SIZE=1400;
+
+
+ //class methods
+ private static final WeakHashMap<SocketAddress, UDPMultiplexer> localEndpoints
+ = new WeakHashMap<SocketAddress, UDPMultiplexer>();
+
+ public static UDPMultiplexer get(DatagramSocket socket){
+ SocketAddress localInetSocketAddress = null;
+ UDPMultiplexer result = null;
+ if ( socket.isBound()){
+ SocketAddress sa = socket.getLocalSocketAddress();
+ if ( sa instanceof InetSocketAddress ){
+ localInetSocketAddress = (InetSocketAddress) sa;
+ } else {
+ // Must be a special DatagramSocket impl or extended.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPMultiplexer(socket);
+ if (localInetSocketAddress == null){
+ // The DatagramSocket was unbound, it should be bound now.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ Logger.getLogger(UDPMultiplexer.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null && exists.getSocket().equals(socket)) result = exists;
+ // Only cache if a record doesn't already exist.
+ else if (exists == null) localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+
+
+ public static UDPMultiplexer get(InetAddress localAddress, int localPort){
+ InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort);
+ return get(localInetSocketAddress);
+ }
+
+ public static UDPMultiplexer get(SocketAddress localSocketAddress){
+ InetSocketAddress localInetSocketAddress = null;
+ if (localSocketAddress instanceof InetSocketAddress){
+ localInetSocketAddress = (InetSocketAddress) localSocketAddress;
+ } else if (localSocketAddress instanceof UDTSocketAddress){
+ UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress;
+ localInetSocketAddress =
+ new InetSocketAddress(udtSA.getAddress(), udtSA.getPort());
+ }
+ if (localInetSocketAddress == null) return null;
+ UDPMultiplexer result = null;
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPMultiplexer(localInetSocketAddress);
+ if (localInetSocketAddress.getPort() == 0 ||
+ localInetSocketAddress.getAddress().isAnyLocalAddress()){
+ // ephemeral port or wildcard address, bind operation is complete.
+ localInetSocketAddress =
+ new InetSocketAddress(result.getLocalAddress(), result.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ } catch (UnknownHostException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null) result = exists;
+ else localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+ /**
+ * Allows a custom endpoint to be added to the pool.
+ * @param endpoint
+ */
+ public static void put(UDPMultiplexer endpoint){
+ SocketAddress local = endpoint.getSocket().getLocalSocketAddress();
+ synchronized (localEndpoints){
+ localEndpoints.put(local, endpoint);
+ }
+ }
+
+ //object fields
+ private final int port;
+
+ private final DatagramSocket dgSocket;
+
+ //active sessions keyed by socket ID
+ private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>();
+
+ //last received packet
+ private UDTPacket lastPacket;
+
+ //if the endpoint is configured for a server socket,
+ //this queue is used to handoff new UDTSessions to the application
+ private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
+
+ private final ObjectPool<BlockingQueue<UDTSession>> queuePool
+ = new ObjectPool<BlockingQueue<UDTSession>>(20);
+
+ private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff
+ = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120);
+
+ // registered sockets
+ private final Set<Integer> registeredSockets = new HashSet<Integer>(120);
+ // registered sockets lock
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ private final Lock readSocketLock = rwl.readLock();
+ private final Lock writeSocketLock = rwl.writeLock();
+
+
+ private boolean serverSocketMode=false;
+
+ //has the endpoint been stopped?
+ private volatile boolean stopped=false;
+
+ private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
+
+
+ /**
+ * create an endpoint on the given socket
+ *
+ * @param socket - a UDP datagram socket
+ * @throws SocketException
+ */
+ protected UDPMultiplexer(DatagramSocket socket) throws SocketException{
+ this.dgSocket=socket;
+ if (!socket.isBound()){
+ socket.bind(null);
+ }
+ port=dgSocket.getLocalPort();
+ }
+
+ /**
+ * bind to any local port on the given host address
+ * @param localAddress
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ private UDPMultiplexer(InetAddress localAddress)throws SocketException, UnknownHostException{
+ this(localAddress,0);
+ }
+
+ /**
+ * Bind to the given address and port
+ * @param localAddress
+ * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ private UDPMultiplexer(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
+ if(localAddress==null){
+ dgSocket=new DatagramSocket(localPort, localAddress);
+ }else{
+ dgSocket=new DatagramSocket(localPort);
+ }
+ if(localPort>0)this.port = localPort;
+ else port=dgSocket.getLocalPort();
+
+ configureSocket();
+ }
+
+ private UDPMultiplexer (InetSocketAddress localSocketAddress)
+ throws SocketException, UnknownHostException {
+ dgSocket = new DatagramSocket(localSocketAddress);
+ port = dgSocket.getLocalPort();
+ configureSocket();
+ }
+
+ protected void configureSocket()throws SocketException{
+ //set a time out to avoid blocking in doReceive()
+ dgSocket.setSoTimeout(100000);
+ //buffer size
+ dgSocket.setReceiveBufferSize(128*1024);
+ dgSocket.setReuseAddress(false);
+ }
+
+ /**
+ * bind to the default network interface on the machine
+ *
+ * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ public UDPMultiplexer(int localPort)throws SocketException, UnknownHostException{
+ this(null,localPort);
+ }
+
+ /**
+ * bind to an ephemeral port on the default network interface on the machine
+ *
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ public UDPMultiplexer()throws SocketException, UnknownHostException{
+ this(null,0);
+ }
+
+ /**
+ * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>,
+ * a new connection can be handed off to an application. The application needs to
+ * call #accept() to get the socket
+ * @param serverSocketModeEnabled
+ */
+ public void start(boolean serverSocketModeEnabled){
+ serverSocketMode=serverSocketModeEnabled;
+ //start receive thread
+ Runnable receive=new Runnable(){
+ public void run(){
+ try{
+ doReceive();
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"",ex);
+ }
+ }
+ };
+ Thread t=UDTThreadFactory.get().newThread(receive);
+ t.setName("UDPEndpoint-"+t.getName());
+ t.setDaemon(true);
+ t.start();
+ logger.info("UDTEndpoint started.");
+ }
+
+ public void start(){
+ start(false);
+ }
+
+ public void stop(){
+ stopped=true;
+ dgSocket.close();
+ }
+
+ /**
+ * Provides assistance to a socket to determine a random socket id,
+ * every caller receives a unique value. This value is unique at the
+ * time of calling, however it may not be at registration time.
+ *
+ * This socketID has not been registered, all socket ID's must be
+ * registered or connection will fail.
+ * @return
+ */
+ public int getUniqueSocketID(){
+ Integer socketID = nextSocketID.getAndIncrement();
+ try{
+ readSocketLock.lock();
+ while (registeredSockets.contains(socketID)){
+ socketID = nextSocketID.getAndIncrement();
+ }
+ return socketID; // should we register it?
+ } finally {
+ readSocketLock.unlock();
+ }
+ }
+
+ void registerSocketID(int socketID, UDTSocket socket) throws SocketException {
+ if (!equals(socket.getEndpoint())) throw new SocketException (
+ "Socket doesn't originate for this endpoint: "
+ + socket.toString());
+ try {
+ writeSocketLock.lock();
+ if (registeredSockets.contains(socketID)){
+ throw new SocketException("Already registered, Socket ID: " +socketID);
+ }
+ registeredSockets.add(socketID);
+ }finally{
+ writeSocketLock.unlock();
+ }
+ }
+
+ /**
+ * @return the port which this client is bound to
+ */
+ public int getLocalPort() {
+ return this.dgSocket.getLocalPort();
+ }
+ /**
+ * @return Gets the local address to which the socket is bound
+ */
+ public InetAddress getLocalAddress(){
+ return this.dgSocket.getLocalAddress();
+ }
+
+ DatagramSocket getSocket(){
+ return dgSocket;
+ }
+
+ UDTPacket getLastPacket(){
+ return lastPacket;
+ }
+
+ public void addSession(Integer destinationID,UDTSession session){
+ logger.log(Level.INFO, "Storing session <{0}>", destinationID);
+ sessions.put(destinationID, session);
+ }
+
+ public UDTSession getSession(Long destinationID){
+ return sessions.get(destinationID);
+ }
+
+ public Collection<UDTSession> getSessions(){
+ return sessions.values();
+ }
+
+ /**
+ * wait the given time for a new connection
+ * @param timeout - the time to wait
+ * @param unit - the {@link TimeUnit}
+ * @param socketID - the socket id.
+ * @return a new {@link UDTSession}
+ * @throws InterruptedException
+ */
+ protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{
+ //return sessionHandoff.poll(timeout, unit);
+ BlockingQueue<UDTSession> session = handoff.get(socketID);
+ try {
+ if (session == null){
+ session = queuePool.get();
+ if (session == null) {
+ session = new ArrayBlockingQueue<UDTSession>(1);
+ }
+ BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session);
+ if (existed != null){
+ session = existed;
+ }
+ }
+ return session.poll(timeout, unit);
+ } finally {
+ boolean removed = handoff.remove(socketID, session);
+ if (removed){
+ session.clear();
+ queuePool.accept(session);
+ }
+ }
+ }
+
+
+ final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE);
+
+ /**
+ * single receive, run in the receiverThread, see {@link #start()}
+ * <ul>
+ * <li>Receives UDP packets from the network</li>
+ * <li>Converts them to UDT packets</li>
+ * <li>dispatches the UDT packets according to their destination ID.</li>
+ * </ul>
+ * @throws IOException
+ */
+ private long lastDestID=-1;
+ private UDTSession lastSession;
+
+ private int n=0;
+
+ private final Object lock=new Object();
+
+ protected void doReceive()throws IOException{
+ while(!stopped){
+ try{
+ //will block until a packet is received or timeout has expired
+ dgSocket.receive(dp);
+ UDTSocketAddress peer= null;
+ int l=dp.getLength();
+ UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
+ lastPacket=packet;
+ //handle connection handshake
+ if(packet.isConnectionHandshake()){
+ synchronized(lock){
+ Long id=Long.valueOf(packet.getDestinationID());
+ UDTSession session=sessions.get(id);
+ if(session==null){ // What about DOS?
+ session=new ServerSession(dp,this);
+ addSession(session.getSocketID(),session);
+ //TODO need to check peer to avoid duplicate server session
+ if(serverSocketMode){
+ logger.fine("Pooling new request.");
+// sessionHandoff.put(session); // blocking method, what about offer?
+ BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID());
+ if (queue != null){
+ boolean success = queue.offer(session);
+ if (success){
+ logger.fine("Request taken for processing.");
+ } else {
+ logger.fine("Request discarded, queue full.");
+ }
+ } else {
+ logger.fine("No ServerSocket listening at socketID: "
+ + session.getSocketID()
+ + "to answer request");
+ }
+ }
+ }
+ peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
+ peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(),
+ ((ConnectionHandshake)packet).getSocketID());
+ session.received(packet,peer);
+ }
+ }
+ else{
+ //dispatch to existing session
+ long dest=packet.getDestinationID();
+ UDTSession session;
+ if(dest==lastDestID){
+ session=lastSession;
+ }
+ else{
+ session=sessions.get(dest);
+ lastSession=session;
+ lastDestID=dest;
+ }
+ if(session==null){
+ n++;
+ if(n%100==1){
+ logger.warning("Unknown session <"+dest
+ +"> requested from <"+peer+"> packet type "
+ +packet.getClass().getName());
+ }
+ }
+ else{
+ session.received(packet,peer);
+ }
+ }
+ }catch(SocketException ex){
+ logger.log(Level.INFO, "SocketException: "+ex.getMessage());
+ }catch(SocketTimeoutException ste){
+ //can safely ignore... we will retry until the endpoint is stopped
+ }catch(Exception ex){
+ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
+ }
+ }
+ }
+
+ protected void doSend(UDTPacket packet)throws IOException{
+ byte[]data=packet.getEncoded();
+ DatagramPacket dgp = packet.getSession().getDatagram();
+ dgp.setData(data);
+ dgSocket.send(dgp);
+ }
+
+ public String toString(){
+ return "UDPEndpoint port="+port;
+ }
+
+ public void sendRaw(DatagramPacket p)throws IOException{
+ dgSocket.send(p);
+ }
+}
Modified: udt-java/skunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -48,23 +48,23 @@
public class UDTClient {
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private final UDPEndPoint clientEndpoint;
+ private final UDPMultiplexer clientEndpoint;
private ClientSession clientSession;
public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint= UDPEndPoint.get(address,localport);
+ clientEndpoint= UDPMultiplexer.get(address,localport);
logger.info("Created client endpoint on port "+localport);
}
public UDTClient(InetAddress address)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint= UDPEndPoint.get(address, 0);
+ clientEndpoint= UDPMultiplexer.get(address, 0);
logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort());
}
- public UDTClient(UDPEndPoint endpoint)throws SocketException, UnknownHostException{
+ public UDTClient(UDPMultiplexer endpoint)throws SocketException, UnknownHostException{
clientEndpoint=endpoint;
}
@@ -153,7 +153,7 @@
return clientSession.getSocket().getOutputStream();
}
- public UDPEndPoint getEndpoint()throws IOException{
+ public UDPMultiplexer getEndpoint()throws IOException{
return clientEndpoint;
}
Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -175,7 +175,7 @@
statistics.setSendPeriod(packetSendingPeriod);
}
- private final long PS=UDPEndPoint.DATAGRAM_SIZE;
+ private final long PS=UDPMultiplexer.DATAGRAM_SIZE;
private final double BetaDivPS=0.0000015/PS;
//see spec page 16
@@ -184,7 +184,7 @@
double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod;
if(remaining<=0){
- return 1.0/UDPEndPoint.DATAGRAM_SIZE;
+ return 1.0/UDPMultiplexer.DATAGRAM_SIZE;
}
else{
double exp=Math.ceil(Math.log10(remaining*PS*8));
Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -67,7 +67,7 @@
private static final Logger logger=Logger.getLogger(UDTReceiver.class.getName());
- private final UDPEndPoint endpoint;
+ private final UDPMultiplexer endpoint;
private final UDTSession session;
@@ -159,7 +159,7 @@
* create a receiver with a valid {@link UDTSession}
* @param session
*/
- public UDTReceiver(UDTSession session,UDPEndPoint endpoint){
+ public UDTReceiver(UDTSession session,UDPMultiplexer endpoint){
this.endpoint = endpoint;
this.session=session;
this.sessionUpSince=System.currentTimeMillis();
Modified: udt-java/skunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -67,7 +67,7 @@
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private final UDPEndPoint endpoint;
+ private final UDPMultiplexer endpoint;
private final UDTSession session;
private final UDTStatistics statistics;
@@ -117,7 +117,7 @@
private final boolean storeStatistics;
private final int chunksize;
- public UDTSender(UDTSession session,UDPEndPoint endpoint){
+ public UDTSender(UDTSession session,UDPMultiplexer endpoint){
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
this.endpoint= endpoint;
this.session=session;
Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -48,7 +48,7 @@
public class UDTServerSocket extends ServerSocket {
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private volatile UDPEndPoint endpoint;
+ private volatile UDPMultiplexer endpoint;
private volatile InetAddress localAdd;
private volatile int locPort;
private volatile SocketAddress localSocketAddress;
@@ -64,7 +64,7 @@
*/
public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{
super();
- endpoint= UDPEndPoint.get(localAddress,port);
+ endpoint= UDPMultiplexer.get(localAddress,port);
localAdd = localAddress;
locPort = port;
bound = true;
@@ -105,7 +105,7 @@
throw new IOException("UDTSession was null");
}
- public UDPEndPoint getEndpoint(){
+ public UDPMultiplexer getEndpoint(){
return endpoint;
}
Modified: udt-java/skunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -91,7 +91,7 @@
protected int localPort;
- public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE;
+ public static final int DEFAULT_DATAGRAM_SIZE=UDPMultiplexer.DATAGRAM_SIZE;
/**
* key for a system property defining the CC class to be used
Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -62,7 +62,7 @@
= new ArrayList<UDTSocketAddress>(120);
//endpoint
- private volatile UDPEndPoint endpoint;
+ private volatile UDPMultiplexer endpoint;
private volatile boolean active;
private volatile boolean connected;
@@ -90,7 +90,7 @@
* @param endpoint
* @throws SocketException,UnknownHostException
*/
- UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
+ UDTSocket(UDPMultiplexer endpoint, UDTSession session)throws SocketException,UnknownHostException{
super();
this.endpoint=endpoint;
this.session=session;
@@ -105,7 +105,7 @@
public UDTSocket(InetAddress host, int port ) throws SocketException,
UnknownHostException{
super();
- this.endpoint = UDPEndPoint.get(host, port);
+ this.endpoint = UDPMultiplexer.get(host, port);
this.session = null;
this.receiver = null;
this.sender = null;
@@ -193,7 +193,7 @@
if (boundSockets.contains(bindpoint)) throw
new IOException("A socket is already bound to this address");
}
- endpoint = UDPEndPoint.get(bindpoint);
+ endpoint = UDPMultiplexer.get(bindpoint);
if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint");
bound = true;
}
@@ -517,7 +517,7 @@
return active;
}
- public UDPEndPoint getEndpoint() {
+ public UDPMultiplexer getEndpoint() {
return endpoint;
}
Modified: udt-java/skunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -40,7 +40,7 @@
import java.net.InetAddress;
import java.security.MessageDigest;
-import udt.UDPEndPoint;
+import udt.UDPMultiplexer;
/**
* helper methods
@@ -150,7 +150,7 @@
* @return the local port that can now be accessed by the client
* @throws IOException
*/
- public static void doHolePunch(UDPEndPoint endpoint,InetAddress client, int clientPort)throws IOException{
+ public static void doHolePunch(UDPMultiplexer endpoint,InetAddress client, int clientPort)throws IOException{
DatagramPacket p=new DatagramPacket(new byte[1],1);
p.setAddress(client);
p.setPort(clientPort);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pe...@us...> - 2011-08-05 04:24:09
|
Revision: 60
http://udt-java.svn.sourceforge.net/udt-java/?rev=60&view=rev
Author: pete_
Date: 2011-08-05 04:24:00 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Experimental changes, partial implementation of java sockets and UDP multiplexing.
Modified Paths:
--------------
udt-java/skunk/src/main/java/udt/ClientSession.java
udt-java/skunk/src/main/java/udt/ServerSession.java
udt-java/skunk/src/main/java/udt/UDPEndPoint.java
udt-java/skunk/src/main/java/udt/UDTClient.java
udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
udt-java/skunk/src/main/java/udt/UDTInputStream.java
udt-java/skunk/src/main/java/udt/UDTReceiver.java
udt-java/skunk/src/main/java/udt/UDTSender.java
udt-java/skunk/src/main/java/udt/UDTServerSocket.java
udt-java/skunk/src/main/java/udt/UDTSession.java
udt-java/skunk/src/main/java/udt/UDTSocket.java
udt-java/skunk/src/main/java/udt/packets/ConnectionHandshake.java
udt-java/skunk/src/main/java/udt/packets/DataPacket.java
udt-java/skunk/src/main/java/udt/packets/PacketFactory.java
udt-java/skunk/src/main/java/udt/packets/PacketUtil.java
udt-java/skunk/src/main/java/udt/unicore/FufexSend.java
udt-java/skunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/skunk/src/main/java/udt/util/ReceiveFile.java
udt-java/skunk/src/main/java/udt/util/SendFile.java
udt-java/skunk/src/main/java/udt/util/SequenceNumber.java
udt-java/skunk/src/test/java/echo/EchoServer.java
udt-java/skunk/src/test/java/echo/TestEchoServer.java
udt-java/skunk/src/test/java/echo/TestEchoServerMultiClient.java
udt-java/skunk/src/test/java/udt/TestUDTServerSocket.java
udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java
udt-java/skunk/src/test/java/udt/performance/TestUDTLargeData.java
Added Paths:
-----------
udt-java/skunk/src/main/java/udt/packets/UDTSocketAddress.java
udt-java/skunk/src/main/java/udt/util/ObjectPool.java
udt-java/skunk/src/main/java/udt/util/Recycler.java
Removed Paths:
-------------
udt-java/skunk/src/main/java/udt/packets/Destination.java
Property Changed:
----------------
udt-java/skunk/
Property changes on: udt-java/skunk
___________________________________________________________________
Added: svn:ignore
+ target
Modified: udt-java/skunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -38,7 +38,7 @@
import java.util.logging.Logger;
import udt.packets.ConnectionHandshake;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.Shutdown;
import udt.util.SequenceNumber;
@@ -52,7 +52,7 @@
private UDPEndPoint endPoint;
- public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{
+ public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
logger.info("Created "+toString());
@@ -78,7 +78,7 @@
}
@Override
- public void received(UDTPacket packet, Destination peer) {
+ public void received(UDTPacket packet, UDTSocketAddress peer) {
lastPacket=packet;
@@ -91,7 +91,7 @@
if(hs.getConnectionType()==1){
try{
//TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
+ int peerSocketID=hs.getSocketID();
destination.setSocketID(peerSocketID);
sendConfirmation(hs);
}catch(Exception ex){
@@ -103,10 +103,12 @@
else{
try{
//TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
+ int peerSocketID=hs.getSocketID();
destination.setSocketID(peerSocketID);
setState(ready);
+ if (socket == null){
socket=new UDTSocket(endPoint,this);
+ }
}catch(Exception ex){
logger.log(Level.WARNING,"Error creating socket",ex);
setState(invalid);
@@ -146,9 +148,7 @@
ConnectionHandshake handshake = new ConnectionHandshake();
handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
- long initialSequenceNo=SequenceNumber.random();
- setInitialSequenceNumber(initialSequenceNo);
- handshake.setInitialSeqNo(initialSequenceNo);
+ handshake.setInitialSeqNo(getCurrentSequenceNumber());
handshake.setPacketSize(getDatagramSize());
handshake.setSocketID(mySocketID);
handshake.setMaxFlowWndSize(flowWindowSize);
Modified: udt-java/skunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -40,7 +40,7 @@
import java.util.logging.Logger;
import udt.packets.ConnectionHandshake;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.KeepAlive;
import udt.packets.Shutdown;
@@ -57,7 +57,7 @@
private UDTPacket lastPacket;
public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
- super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort()));
+ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0));
this.endPoint=endPoint;
logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
}
@@ -65,7 +65,7 @@
int n_handshake=0;
@Override
- public void received(UDTPacket packet, Destination peer){
+ public void received(UDTPacket packet, UDTSocketAddress peer){
lastPacket=packet;
if(packet instanceof ConnectionHandshake) {
@@ -83,7 +83,9 @@
n_handshake++;
try{
setState(ready);
+ if (socket == null){
socket=new UDTSocket(endPoint, this);
+ }
cc.init();
}catch(Exception uhe){
//session is invalid
@@ -166,7 +168,7 @@
long clientBufferSize=handshake.getPacketSize();
long myBufferSize=getDatagramSize();
long bufferSize=Math.min(clientBufferSize, myBufferSize);
- long initialSequenceNumber=handshake.getInitialSeqNo();
+ int initialSequenceNumber=(int) handshake.getInitialSeqNo();
setInitialSequenceNumber(initialSequenceNumber);
setDatagramSize((int)bufferSize);
responseHandshake.setPacketSize(bufferSize);
Modified: udt-java/skunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -36,20 +36,34 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import udt.packets.ConnectionHandshake;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.PacketFactory;
+import udt.util.ObjectPool;
import udt.util.UDTThreadFactory;
/**
@@ -58,14 +72,116 @@
*/
public class UDPEndPoint {
+ //class fields
private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
+ public static final int DATAGRAM_SIZE=1400;
+
+ //class methods
+ private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints
+ = new WeakHashMap<SocketAddress, UDPEndPoint>();
+
+ public static UDPEndPoint get(DatagramSocket socket){
+ SocketAddress localInetSocketAddress = null;
+ UDPEndPoint result = null;
+ if ( socket.isBound()){
+ SocketAddress sa = socket.getLocalSocketAddress();
+ if ( sa instanceof InetSocketAddress ){
+ localInetSocketAddress = (InetSocketAddress) sa;
+ } else {
+ // Must be a special DatagramSocket impl or extended.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPEndPoint(socket);
+ if (localInetSocketAddress == null){
+ // The DatagramSocket was unbound, it should be bound now.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null && exists.getSocket().equals(socket)) result = exists;
+ // Only cache if a record doesn't already exist.
+ else if (exists == null) localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+
+
+ public static UDPEndPoint get(InetAddress localAddress, int localPort){
+ InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort);
+ return get(localInetSocketAddress);
+ }
+
+ public static UDPEndPoint get(SocketAddress localSocketAddress){
+ InetSocketAddress localInetSocketAddress = null;
+ if (localSocketAddress instanceof InetSocketAddress){
+ localInetSocketAddress = (InetSocketAddress) localSocketAddress;
+ } else if (localSocketAddress instanceof UDTSocketAddress){
+ UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress;
+ localInetSocketAddress =
+ new InetSocketAddress(udtSA.getAddress(), udtSA.getPort());
+ }
+ if (localInetSocketAddress == null) return null;
+ UDPEndPoint result = null;
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPEndPoint(localInetSocketAddress);
+ if (localInetSocketAddress.getPort() == 0 ||
+ localInetSocketAddress.getAddress().isAnyLocalAddress()){
+ // ephemeral port or wildcard address, bind operation is complete.
+ localInetSocketAddress =
+ new InetSocketAddress(result.getLocalAddress(), result.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ } catch (UnknownHostException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null) result = exists;
+ else localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+ /**
+ * Allows a custom endpoint to be added to the pool.
+ * @param endpoint
+ */
+ public static void put(UDPEndPoint endpoint){
+ SocketAddress local = endpoint.getSocket().getLocalSocketAddress();
+ synchronized (localEndpoints){
+ localEndpoints.put(local, endpoint);
+ }
+ }
+
+ //object fields
private final int port;
private final DatagramSocket dgSocket;
//active sessions keyed by socket ID
- private final Map<Long,UDTSession>sessions=new ConcurrentHashMap<Long, UDTSession>();
+ private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>();
//last received packet
private UDTPacket lastPacket;
@@ -74,20 +190,39 @@
//this queue is used to handoff new UDTSessions to the application
private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
+ private final ObjectPool<BlockingQueue<UDTSession>> queuePool
+ = new ObjectPool<BlockingQueue<UDTSession>>(20);
+
+ private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff
+ = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120);
+
+ // registered sockets
+ private final Set<Integer> registeredSockets = new HashSet<Integer>(120);
+ // registered sockets lock
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ private final Lock readSocketLock = rwl.readLock();
+ private final Lock writeSocketLock = rwl.writeLock();
+
+
private boolean serverSocketMode=false;
//has the endpoint been stopped?
private volatile boolean stopped=false;
- public static final int DATAGRAM_SIZE=1400;
+ private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
+
/**
* create an endpoint on the given socket
*
* @param socket - a UDP datagram socket
+ * @throws SocketException
*/
- public UDPEndPoint(DatagramSocket socket){
+ protected UDPEndPoint(DatagramSocket socket) throws SocketException{
this.dgSocket=socket;
+ if (!socket.isBound()){
+ socket.bind(null);
+ }
port=dgSocket.getLocalPort();
}
@@ -97,7 +232,7 @@
* @throws SocketException
* @throws UnknownHostException
*/
- public UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{
+ private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{
this(localAddress,0);
}
@@ -108,7 +243,7 @@
* @throws SocketException
* @throws UnknownHostException
*/
- public UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
+ private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
if(localAddress==null){
dgSocket=new DatagramSocket(localPort, localAddress);
}else{
@@ -120,6 +255,13 @@
configureSocket();
}
+ private UDPEndPoint (InetSocketAddress localSocketAddress)
+ throws SocketException, UnknownHostException {
+ dgSocket = new DatagramSocket(localSocketAddress);
+ port = dgSocket.getLocalPort();
+ configureSocket();
+ }
+
protected void configureSocket()throws SocketException{
//set a time out to avoid blocking in doReceive()
dgSocket.setSoTimeout(100000);
@@ -184,6 +326,43 @@
}
/**
+ * Provides assistance to a socket to determine a random socket id,
+ * every caller receives a unique value. This value is unique at the
+ * time of calling, however it may not be at registration time.
+ *
+ * This socketID has not been registered, all socket ID's must be
+ * registered or connection will fail.
+ * @return
+ */
+ public int getUniqueSocketID(){
+ Integer socketID = nextSocketID.getAndIncrement();
+ try{
+ readSocketLock.lock();
+ while (registeredSockets.contains(socketID)){
+ socketID = nextSocketID.getAndIncrement();
+ }
+ return socketID; // should we register it?
+ } finally {
+ readSocketLock.unlock();
+ }
+ }
+
+ void registerSocketID(int socketID, UDTSocket socket) throws SocketException {
+ if (!equals(socket.getEndpoint())) throw new SocketException (
+ "Socket doesn't originate for this endpoint: "
+ + socket.toString());
+ try {
+ writeSocketLock.lock();
+ if (registeredSockets.contains(socketID)){
+ throw new SocketException("Already registered, Socket ID: " +socketID);
+ }
+ registeredSockets.add(socketID);
+ }finally{
+ writeSocketLock.unlock();
+ }
+ }
+
+ /**
* @return the port which this client is bound to
*/
public int getLocalPort() {
@@ -204,8 +383,8 @@
return lastPacket;
}
- public void addSession(Long destinationID,UDTSession session){
- logger.info("Storing session <"+destinationID+">");
+ public void addSession(Integer destinationID,UDTSession session){
+ logger.log(Level.INFO, "Storing session <{0}>", destinationID);
sessions.put(destinationID, session);
}
@@ -221,12 +400,33 @@
* wait the given time for a new connection
* @param timeout - the time to wait
* @param unit - the {@link TimeUnit}
+ * @param socketID - the socket id.
* @return a new {@link UDTSession}
* @throws InterruptedException
*/
- protected UDTSession accept(long timeout, TimeUnit unit)throws InterruptedException{
- return sessionHandoff.poll(timeout, unit);
+ protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{
+ //return sessionHandoff.poll(timeout, unit);
+ BlockingQueue<UDTSession> session = handoff.get(socketID);
+ try {
+ if (session == null){
+ session = queuePool.get();
+ if (session == null) {
+ session = new ArrayBlockingQueue<UDTSession>(1);
}
+ BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session);
+ if (existed != null){
+ session = existed;
+ }
+ }
+ return session.poll(timeout, unit);
+ } finally {
+ boolean removed = handoff.remove(socketID, session);
+ if (removed){
+ session.clear();
+ queuePool.accept(session);
+ }
+ }
+ }
final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE);
@@ -250,32 +450,42 @@
protected void doReceive()throws IOException{
while(!stopped){
try{
- try{
-
//will block until a packet is received or timeout has expired
dgSocket.receive(dp);
-
- Destination peer=new Destination(dp.getAddress(), dp.getPort());
+ UDTSocketAddress peer= null;
int l=dp.getLength();
UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
lastPacket=packet;
-
//handle connection handshake
if(packet.isConnectionHandshake()){
synchronized(lock){
Long id=Long.valueOf(packet.getDestinationID());
UDTSession session=sessions.get(id);
- if(session==null){
+ if(session==null){ // What about DOS?
session=new ServerSession(dp,this);
addSession(session.getSocketID(),session);
//TODO need to check peer to avoid duplicate server session
if(serverSocketMode){
logger.fine("Pooling new request.");
- sessionHandoff.put(session);
+// sessionHandoff.put(session); // blocking method, what about offer?
+ BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID());
+ if (queue != null){
+ boolean success = queue.offer(session);
+ if (success){
logger.fine("Request taken for processing.");
+ } else {
+ logger.fine("Request discarded, queue full.");
}
+ } else {
+ logger.fine("No ServerSocket listening at socketID: "
+ + session.getSocketID()
+ + "to answer request");
}
+ }
+ }
peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
+ peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(),
+ ((ConnectionHandshake)packet).getSocketID());
session.received(packet,peer);
}
}
@@ -294,7 +504,9 @@
if(session==null){
n++;
if(n%100==1){
- logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
+ logger.warning("Unknown session <"+dest
+ +"> requested from <"+peer+"> packet type "
+ +packet.getClass().getName());
}
}
else{
@@ -305,8 +517,6 @@
logger.log(Level.INFO, "SocketException: "+ex.getMessage());
}catch(SocketTimeoutException ste){
//can safely ignore... we will retry until the endpoint is stopped
- }
-
}catch(Exception ex){
logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
}
Modified: udt-java/skunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -33,13 +33,15 @@
package udt;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.Shutdown;
import udt.util.UDTStatistics;
@@ -52,13 +54,13 @@
public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint=new UDPEndPoint(address,localport);
+ clientEndpoint= UDPEndPoint.get(address,localport);
logger.info("Created client endpoint on port "+localport);
}
public UDTClient(InetAddress address)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint=new UDPEndPoint(address);
+ clientEndpoint= UDPEndPoint.get(address, 0);
logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort());
}
@@ -75,7 +77,7 @@
*/
public void connect(String host, int port)throws InterruptedException, UnknownHostException, IOException{
InetAddress address=InetAddress.getByName(host);
- Destination destination=new Destination(address,port);
+ UDTSocketAddress destination= new UDTSocketAddress(address,port,0);
//create client session...
clientSession=new ClientSession(clientEndpoint,destination);
clientEndpoint.addSession(clientSession.getSocketID(), clientSession);
@@ -101,7 +103,7 @@
}
/**
- * sends the given data and waits for acknowledgement
+ * sends the given data and waits for acknowledgment
* @param data - the data to send
* @throws IOException
* @throws InterruptedException if interrupted while waiting for ack
@@ -143,11 +145,11 @@
}
}
- public UDTInputStream getInputStream()throws IOException{
+ public InputStream getInputStream()throws IOException{
return clientSession.getSocket().getInputStream();
}
- public UDTOutputStream getOutputStream()throws IOException{
+ public OutputStream getOutputStream()throws IOException{
return clientSession.getSocket().getOutputStream();
}
Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -65,7 +65,7 @@
public UDTCongestionControl(UDTSession session){
this.session=session;
this.statistics=session.getStatistics();
- lastDecreaseSeqNo=session.getInitialSequenceNumber()-1;
+ lastDecreaseSeqNo=session.getCurrentSequenceNumber()-1;
}
/* (non-Javadoc)
Modified: udt-java/skunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -66,10 +66,10 @@
* @param socket - the {@link UDTSocket}
* @throws IOException
*/
- public UDTInputStream(UDTSocket socket)throws IOException{
+ UDTInputStream(UDTSocket socket)throws IOException{
this.socket=socket;
int capacity=socket!=null? 2 * socket.getSession().getFlowWindowSize() : 128 ;
- long initialSequenceNum=socket!=null?socket.getSession().getInitialSequenceNumber():1;
+ int initialSequenceNum=socket!=null?socket.getSession().getCurrentSequenceNumber():1;
receiveBuffer=new ReceiveBuffer(capacity,initialSequenceNum);
}
Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -169,7 +169,7 @@
packetHistoryWindow = new PacketHistoryWindow(16);
receiverLossList = new ReceiverLossList();
packetPairWindow = new PacketPairWindow(16);
- largestReceivedSeqNumber=session.getInitialSequenceNumber()-1;
+ largestReceivedSeqNumber=session.getCurrentSequenceNumber()-1;
bufferSize=session.getReceiveBufferSize();
handoffQueue=new ArrayBlockingQueue<UDTPacket>(4*session.getFlowWindowSize());
storeStatistics=Boolean.getBoolean("udt.receiver.storeStatistics");
@@ -396,7 +396,7 @@
// return;
// }
// //}
- boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
+ boolean OK= ((UDTInputStream) session.getSocket().getInputStream()).haveNewData(currentSequenceNumber,dp.getData());
if(!OK){
//need to drop packet...
return;
Modified: udt-java/skunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -68,9 +68,7 @@
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
private final UDPEndPoint endpoint;
-
private final UDTSession session;
-
private final UDTStatistics statistics;
//senderLossList stores the sequence numbers of lost packets
@@ -92,31 +90,31 @@
private final AtomicInteger unacknowledged=new AtomicInteger(0);
//for generating data packet sequence numbers
- private volatile long currentSequenceNumber=0;
+ //volatile long counters are not atomic.
+ //private volatile int currentSequenceNumber=0;
//the largest data packet sequence number that has actually been sent out
- private volatile long largestSentSequenceNumber=-1;
+ private volatile int largestSentSequenceNumber=-1;
//last acknowledge number, initialised to the initial sequence number
- private volatile long lastAckSequenceNumber;
+ private volatile int lastAckSequenceNumber;
private volatile boolean started=false;
-
private volatile boolean stopped=false;
-
private volatile boolean paused=false;
//used to signal that the sender should start to send
private volatile CountDownLatch startLatch=new CountDownLatch(1);
//used by the sender to wait for an ACK
- private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+ private final AtomicReference<CountDownLatch> waitForAckLatch
+ =new AtomicReference<CountDownLatch>();
//used by the sender to wait for an ACK of a certain sequence number
- private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
+ private final AtomicReference<CountDownLatch> waitForSeqAckLatch
+ =new AtomicReference<CountDownLatch>();
private final boolean storeStatistics;
-
private final int chunksize;
public UDTSender(UDTSession session,UDPEndPoint endpoint){
@@ -128,8 +126,8 @@
sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2);
chunksize=session.getDatagramSize()-24;//need space for the header;
flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize);
- lastAckSequenceNumber=session.getInitialSequenceNumber();
- currentSequenceNumber=session.getInitialSequenceNumber()-1;
+ lastAckSequenceNumber=session.getCurrentSequenceNumber();
+// currentSequenceNumber=session.getCurrentSequenceNumber()-1;
waitForAckLatch.set(new CountDownLatch(1));
waitForSeqAckLatch.set(new CountDownLatch(1));
storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics");
@@ -212,6 +210,7 @@
protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{
if(!started)start();
DataPacket packet=null;
+
do{
packet=flowWindow.getForProducer();
if(packet==null){
@@ -219,7 +218,7 @@
}
}while(packet==null);//TODO check timeout...
try{
- packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setPacketSequenceNumber(incrementAndGetSequenceNo());
packet.setSession(session);
packet.setDestinationID(session.getDestination().getSocketID());
int len=Math.min(bb.remaining(),chunksize);
@@ -253,7 +252,7 @@
}
}while(packet==null);
try{
- packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setPacketSequenceNumber(incrementAndGetSequenceNo());
packet.setSession(session);
packet.setDestinationID(session.getDestination().getSocketID());
packet.setData(data);
@@ -309,7 +308,7 @@
unacknowledged.decrementAndGet();
}
}
- lastAckSequenceNumber=Math.max(lastAckSequenceNumber, ackNumber);
+ lastAckSequenceNumber=(int) Math.max(lastAckSequenceNumber, ackNumber);
//send ACK2 packet to the receiver
sendAck2(ackNumber);
statistics.incNumberOfACKReceived();
@@ -362,22 +361,20 @@
Long entry=senderLossList.getFirstEntry();
if(entry!=null){
handleRetransmit(entry);
- }
- else
- {
+ }else{
//if the number of unacknowledged data packets does not exceed the congestion
//and the flow window sizes, pack a new packet
int unAcknowledged=unacknowledged.get();
-
- if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
- && unAcknowledged<session.getFlowWindowSize()){
+ if(unAcknowledged<session.getCongestionControl()
+ .getCongestionWindowSize()
+ && unAcknowledged<session.getFlowWindowSize())
+ {
//check for application data
DataPacket dp=flowWindow.consumeData();
if(dp!=null){
send(dp);
- largestSentSequenceNumber=dp.getPacketSequenceNumber();
- }
- else{
+ largestSentSequenceNumber=(int) dp.getPacketSequenceNumber();
+ }else{
statistics.incNumberOfMissingDataEvents();
}
}else{
@@ -388,7 +385,6 @@
waitForAck();
}
}
-
//wait
if(largestSentSequenceNumber % 16 !=0){
long snd=(long)session.getCongestionControl().getSendInterval();
@@ -445,13 +441,12 @@
* the next sequence number for data packets.
* The initial sequence number is "0"
*/
- public long getNextSequenceNumber(){
- currentSequenceNumber=SequenceNumber.increment(currentSequenceNumber);
- return currentSequenceNumber;
+ public int incrementAndGetSequenceNo(){
+ return session.incrementAndGetSequenceNo();
}
- public long getCurrentSequenceNumber(){
- return currentSequenceNumber;
+ public int getCurrentSequenceNumber(){
+ return session.getCurrentSequenceNumber();
}
/**
Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -31,35 +31,48 @@
*********************************************************************************/
package udt;
+import java.io.IOException;
import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.logging.Logger;
-public class UDTServerSocket {
+public class UDTServerSocket extends ServerSocket {
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private final UDPEndPoint endpoint;
+ private volatile UDPEndPoint endpoint;
+ private volatile InetAddress localAdd;
+ private volatile int locPort;
+ private volatile SocketAddress localSocketAddress;
- private boolean started=false;
-
+ private volatile boolean started=false;
+ private volatile boolean bound = false;
private volatile boolean shutdown=false;
/**
* create a UDT ServerSocket
- * @param localAddress
+ * @param localSocketAddress
* @param port - the local port. If 0, an ephemeral port will be chosen
*/
- public UDTServerSocket(InetAddress localAddress, int port)throws SocketException,UnknownHostException{
- endpoint=new UDPEndPoint(localAddress,port);
+ public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{
+ super();
+ endpoint= UDPEndPoint.get(localAddress,port);
+ localAdd = localAddress;
+ locPort = port;
+ bound = true;
logger.info("Created server endpoint on port "+endpoint.getLocalPort());
}
//starts a server on localhost
- public UDTServerSocket(int port)throws SocketException,UnknownHostException{
+ public UDTServerSocket(int port)throws IOException,UnknownHostException{
this(InetAddress.getLocalHost(),port);
}
@@ -68,13 +81,16 @@
* for the new connection
* @return
*/
- public synchronized UDTSocket accept()throws InterruptedException{
+@Override
+ public synchronized Socket accept() throws IOException{
if(!started){
endpoint.start(true);
started=true;
}
+ // TODO: use a blocking queue.
while(!shutdown){
- UDTSession session=endpoint.accept(10000, TimeUnit.MILLISECONDS);
+ try {
+ UDTSession session = endpoint.accept(10000, TimeUnit.MILLISECONDS, null);
if(session!=null){
//wait for handshake to complete
while(!session.isReady() || session.getSocket()==null){
@@ -82,16 +98,107 @@
}
return session.getSocket();
}
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
}
- throw new InterruptedException();
}
+ throw new IOException("UDTSession was null");
+ }
- public void shutDown(){
+ public UDPEndPoint getEndpoint(){
+ return endpoint;
+ }
+
+ @Override
+ public void bind(SocketAddress endpoint){
+ //TODO: Implement ServerSocket.bind
+ }
+
+ @Override
+ public void bind(SocketAddress endpoint, int timeout){
+ //TODO: Implement ServerSocket.bind
+ }
+
+ @Override
+ public void close(){
shutdown=true;
+ // TODO: The endpoint might have other ServerSocket's listening,
+ // we need to pass the endpoint the socket, or session or something
+ // the endpoint should only stop when it has no remaining sessions.
endpoint.stop();
}
- public UDPEndPoint getEndpoint(){
- return endpoint;
+ @Override
+ public ServerSocketChannel getChannel(){
+ return null;
}
+
+ @Override
+ public InetAddress getInetAddress(){
+ return localAdd;
}
+
+ @Override
+ public int getLocalPort(){
+ return locPort;
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress(){
+ return localSocketAddress;
+ }
+
+ @Override
+ public int getReceiveBufferSize(){
+ return 0;
+ }
+
+ @Override
+ public boolean getReuseAddress(){
+ return false;
+ }
+
+ @Override
+ public int getSoTimeout(){
+ return 0;
+ }
+
+ @Override
+ public boolean isBound(){
+ return started;
+ }
+
+ @Override
+ public boolean isClosed(){
+ return shutdown;
+ }
+
+ @Override
+ public void setPerformancePreferences(int connectionTime, int latency, int bandwidth){
+
+ }
+
+ @Override
+ public void setReceiveBufferSize(int size){
+
+ }
+
+ @Override
+ public void setReuseAddress(boolean on){
+
+ }
+
+ @Override
+ public void setSoTimeout(int timeout){
+
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(120);
+ sb.append("UDTServerSocket: \n");
+ //TODO: add statistics.
+ return sb.toString();
+ }
+
+}
Modified: udt-java/skunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -33,12 +33,16 @@
package udt;
import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
+import udt.util.SequenceNumber;
import udt.util.UDTStatistics;
public abstract class UDTSession {
@@ -79,7 +83,7 @@
/**
* remote UDT entity (address and socket ID)
*/
- protected final Destination destination;
+ protected final UDTSocketAddress destination;
/**
* local port
@@ -101,16 +105,19 @@
*/
protected int datagramSize=DEFAULT_DATAGRAM_SIZE;
- protected Long initialSequenceNumber=null;
+ protected int initialSequenceNumber=11;
- protected final long mySocketID;
+ private final AtomicInteger sequenceNo = new AtomicInteger(SequenceNumber.random());
- private final static AtomicLong nextSocketID=new AtomicLong(20+new Random().nextInt(5000));
+ protected final int mySocketID;
- public UDTSession(String description, Destination destination){
+ private final static AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
+
+ public UDTSession(String description, UDTSocketAddress dest){
+ InetSocketAddress inetAdd = null;
statistics=new UDTStatistics(description);
mySocketID=nextSocketID.incrementAndGet();
- this.destination=destination;
+ this.destination= dest;
this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort());
String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName());
Object ccObject=null;
@@ -125,10 +132,8 @@
logger.info("Using "+cc.getClass().getName());
}
+ public abstract void received(UDTPacket packet, UDTSocketAddress peer);
- public abstract void received(UDTPacket packet, Destination peer);
-
-
public UDTSocket getSocket() {
return socket;
}
@@ -137,7 +142,7 @@
return cc;
}
- public int getState() {
+ protected int getState() {
return state;
}
@@ -149,7 +154,7 @@
this.socket = socket;
}
- public void setState(int state) {
+ protected void setState(int state) {
logger.info(toString()+" connection state CHANGED to <"+state+">");
this.state = state;
}
@@ -170,7 +175,7 @@
return state==shutdown || state==invalid;
}
- public Destination getDestination() {
+ public UDTSocketAddress getDestination() {
return destination;
}
@@ -202,21 +207,30 @@
return statistics;
}
- public long getSocketID(){
+ public int getSocketID(){
return mySocketID;
}
- public synchronized long getInitialSequenceNumber(){
- if(initialSequenceNumber==null){
- initialSequenceNumber=1l; //TODO must be random?
+ public int getCurrentSequenceNumber(){
+ return sequenceNo.get();
}
- return initialSequenceNumber;
+
+ public int incrementAndGetSequenceNo(){
+ while (true){
+ int sequence = sequenceNo.get();
+ int increment = SequenceNumber.increment(sequence);
+ boolean success = false;
+ success = sequenceNo.compareAndSet(sequence, increment);
+ if (success) return increment;
}
+ }
- public synchronized void setInitialSequenceNumber(long initialSequenceNumber){
- this.initialSequenceNumber=initialSequenceNumber;
+ protected void setInitialSequenceNumber(int initialSequenceNumber){
+ if (state == handshaking){
+ sequenceNo.set(initialSequenceNumber);
}
+ }
public DatagramPacket getDatagram(){
return dgPacket;
Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -32,10 +32,23 @@
package udt;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
+import java.net.SocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import udt.packets.UDTSocketAddress;
/**
* UDTSocket is analogous to a normal java.net.Socket, it provides input and
* output streams for the application
@@ -43,34 +56,443 @@
* TODO is it possible to actually extend java.net.Socket ?
*
*/
-public class UDTSocket {
+public class UDTSocket extends Socket{
+ private static final ArrayList<UDTSocketAddress> boundSockets
+ = new ArrayList<UDTSocketAddress>(120);
+
//endpoint
- private final UDPEndPoint endpoint;
+ private volatile UDPEndPoint endpoint;
private volatile boolean active;
+ private volatile boolean connected;
+ private volatile boolean bound;
+ private volatile boolean shutIn; // receiver closed.
+ private volatile boolean shutOut; // sender closed.
+ private volatile boolean closed;
//processing received data
- private UDTReceiver receiver;
- private UDTSender sender;
+ private volatile UDTReceiver receiver;
+ private volatile UDTSender sender;
- private final UDTSession session;
+ private volatile UDTSession session;
- private UDTInputStream inputStream;
- private UDTOutputStream outputStream;
+ private volatile UDTInputStream inputStream;
+ private volatile UDTOutputStream outputStream;
+
+ private volatile UDTSocketAddress localSocketAddress;
+ private volatile UDTSocketAddress destination;
/**
+ * The session is usually the caller for this constructor
+ * so the session already knows this is the socket.
* @param host
* @param port
* @param endpoint
* @throws SocketException,UnknownHostException
*/
- public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
+ UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
+ super();
this.endpoint=endpoint;
this.session=session;
this.receiver=new UDTReceiver(session,endpoint);
this.sender=new UDTSender(session,endpoint);
+ localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(),
+ endpoint.getLocalPort(), session.getSocketID());
+ destination = session.getDestination();
+ bound = true;
}
+ public UDTSocket(InetAddress host, int port ) throws SocketException,
+ UnknownHostException{
+ super();
+ this.endpoint = UDPEndPoint.get(host, port);
+ this.session = null;
+ this.receiver = null;
+ this.sender = null;
+ active = false;
+ bound = true;
+
+ }
+
+ public UDTSocket(){
+ super();
+ endpoint = null;
+ session = null;
+ receiver = null;
+ sender = null;
+ active = false;
+ bound = false;
+ }
+
+ @Override
+ public void connect(SocketAddress destination) throws IOException {
+ connect(destination, 0);
+ }
+
+ @Override
+ public void connect(SocketAddress destination, int timeout) throws IOException {
+ if (destination == null) throw new IllegalArgumentException("connect: The address can't be null");
+ if (timeout < 0) throw new IllegalArgumentException("connect: timeout can't be negative");
+ if (isClosed()) throw new SocketException("Socket is closed");
+ if (isConnected()) throw new SocketException("already connected");
+ if (!(destination instanceof UDTSocketAddress))
+ throw new IllegalArgumentException("Unsupported address type");
+
+ UDTSocketAddress epoint = (UDTSocketAddress) destination;
+ InetAddress addr = epoint.getAddress();
+ int port = epoint.getPort();
+
+ SecurityManager security = System.getSecurityManager();
+ if (security != null) {
+ security.checkConnect(addr.getHostAddress(),port);
+ }
+ if (session == null){
+ session = new ClientSession(endpoint, (UDTSocketAddress) destination);
+ session.setSocket(this);
+ }
+ endpoint.addSession(session.getSocketID(), session);
+ receiver = new UDTReceiver(session, endpoint);
+ sender = new UDTSender(session, endpoint);
+ localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(),
+ endpoint.getLocalPort(), session.getSocketID());
+ endpoint.start();
+ try {
+ ((ClientSession)session).connect();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ destination = session.getDestination();
+ connected = true;
+ /*
+ * If the socket was not bound before the connect, it is now because
+ * the kernel will have picked an ephemeral port & a local address
+ */
+ bound = true;
+ }
+
+ /**
+ * Binds the socket to a local address.
+ * <P>
+ * If the address is <code>null</code>, then the system will pick up
+ * an ephemeral port and a valid local address to bind the socket.
+ *
+ * @param bindpoint the <code>SocketAddress</code> to bind to
+ * @throws IOException if the bind operation fails, or if the socket
+ * is already bound.
+ * @throws IllegalArgumentException if bindpoint is a
+ * SocketAddress subclass not supported by this socket
+ *
+ * @since 1.4
+ * @see #isBound
+ */
+ public void bind(SocketAddress bindpoint) throws IOException {
+ if (!(bindpoint instanceof UDTSocketAddress))
+ throw new IllegalArgumentException("Unsupported SocketAddress type");
+ if (isBound()) throw new IOException("Socket already bound");
+ synchronized (boundSockets) {
+ if (boundSockets.contains(bindpoint)) throw
+ new IOException("A socket is already bound to this address");
+ }
+ endpoint = UDPEndPoint.get(bindpoint);
+ if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint");
+ bound = true;
+ }
+
+ @Override
+ public InetAddress getInetAddress() {
+ if (!isConnected()) return null;
+ return destination.getAddress();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public InetAddress getLocalAddress() {
+ // This is for backward compatibility only, the super is not bound and
+ // returns InetAddress.anyLocalAddress();
+ if (!isBound()) super.getLocalAddress();
+ return localSocketAddress.getAddress();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public int getPort() {
+ if (!isConnected()) return 0;
+ return destination.getPort();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public int getLocalPort() {
+ if (!isBound()) return -1;
+ return localSocketAddress.getPort();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ if (!isConnected()) return null;
+ return destination;
+ }
+
+ // Inherit javadoc.
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ if (!isBound()) return null;
+ return localSocketAddress;
+ }
+
+ @Override
+ public SocketChannel getChannel() {
+ return null;
+ }
+
+ /**
+ * Not supported
+ * @param on
+ * @throws SocketException
+ */
+ @Override
+ public void setTcpNoDelay(boolean on) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
+ }
+
+ /**
+ * Not supported
+ * @return false
+ * @throws SocketException
+ */
+ @Override
+ public boolean getTcpNoDelay() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ return ((Boolean) getOption(SocketOptions.TCP_NODELAY)).booleanValue();
+ }
+
+ /**
+ * Not supported.
+ * @param on
+ * @param linger
+ * @throws SocketException
+ */
+ @Override
+ public void setSoLinger(boolean on, int linger) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (linger < 0) throw new IllegalArgumentException("SO_LINGER cannot be less than zero");
+ // do nothing, not supported.
+ }
+
+ @Override
+ public int getSoLinger() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ return -1; // implies this option is disabled
+ }
+
+ // Sending of urgent data, should we support it?
+ @Override
+ public void sendUrgentData (int data) throws IOException {
+ throw new SocketException ("Urgent data not supported");
+ }
+
+ // Sending of urgent data, should we enable it?
+ @Override
+ public void setOOBInline(boolean on) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.SO_OOBINLINE, Boolean.valueOf(on));
+ }
+
+ @Override
+ public boolean getOOBInline() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ return ((Boolean) getOption(SocketOptions.SO_OOBINLINE)).booleanValue();
+ }
+
+ // TODO: implement set socket timeout.
+ @Override
+ public void setSoTimeout(int timeout) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (timeout < 0) throw new IllegalArgumentException("negative timeout not allowed");
+ setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout));
+ }
+
+ // TODO: Implement get socket timeout.
+ @Override
+ public synchronized int getSoTimeout() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ Object o = getOption(SocketOptions.SO_TIMEOUT);
+ if (o instanceof Integer) return ((Integer) o).intValue();
+ return 0;
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public void setSendBufferSize(int size)
+ throws SocketException{
+ if (!(size > 0)) throw new IllegalArgumentException("negative send size not allowed");
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.SO_SNDBUF, new Integer(size));
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public int getSendBufferSize() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket is closed");
+ int result = 0;
+ Object o = getOption(SocketOptions.SO_SNDBUF);
+ if (o instanceof Integer) result = ((Integer)o).intValue();
+ return result;
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public void setReceiveBufferSize(int size)
+ throws SocketException{
+ if (size <= 0) throw new IllegalArgumentException("invalid receive size");
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.SO_RCVBUF, new Integer(size));
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public int getReceiveBufferSize()
+ throws SocketException{
+ if (isClosed()) throw new SocketException("Socket closed");
+ int result = 0;
+ Object o = getOption(SocketOptions.SO_RCVBUF);
+ if (o instanceof Integer) {
+ result = ((Integer)o).intValue();
+ }
+ return result;
+ }
+
+ // TODO: Implement keep alive.
+ @Override
+ public void setKeepAlive(boolean on) throws SocketException {
+ if (isClosed())
+ throw new SocketException("Socket is closed");
+ setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on));
+ }
+
+
+ @Override
+ public boolean getKeepAlive() throws SocketException {
+ // Keep alive is not currently implemented in the udt session.
+ if (isClosed())
+ throw new SocketException("Socket is closed");
+ return ((Boolean) getOption(SocketOptions.SO_KEEPALIVE)).booleanValue();
+ }
+
+ public void setTrafficClass(int tc) throws SocketException {
+ // Safe to ignore, not supported.
+ }
+
+ public int getTrafficClass() throws SocketException {
+ // Call redirected to underlying DatagramSocket.
+ return endpoint.getSocket().getTrafficClass();
+ }
+
+ @Override
+ public void setReuseAddress(boolean on) throws SocketException {
+ throw new SocketException("SO_REUSEADDR not supported");
+ }
+
+ /**
+ * Tests if SO_REUSEADDR is enabled.
+ *
+ * @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is enabled.
+ * @exception SocketException if there is an error
+ * in the underlying protocol, such as a TCP error.
+ * @since 1.4
+ * @see #setReuseAddress(boolean)
+ */
+ @Override
+ public boolean getReuseAddress() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket is closed");
+ return ((Boolean) (getOption(SocketOptions.SO_REUSEADDR))).booleanValue();
+ }
+
+ /**
+ * Places the input stream for this socket at "end of stream".
+ * Any data sent to the input stream side of the socket is acknowledged
+ * and then silently discarded.
+ * <p>
+ * If you read from a socket input stream after invoking
+ * shutdownInput() on the socket, the stream will return EOF.
+ *
+ * @exception IOException if an I/O error occurs when shutting down this
+ * socket.
+ *
+ * @since 1.3
+ * @see java.net.Socket#shutdownOutput()
+ * @see java.net.Socket#close()
+ * @see java.net.Socket#setSoLinger(boolean, int)
+ * @see #isInputShutdown
+ */
+ @Override
+ public void shutdownInput() throws IOException
+ {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (!isConnected()) throw new SocketException("Socket not connected");
+ if (isInputShutdown()) throw new SocketException("Socket input already shutdown");
+ receiver.stop();
+ inputStream.close();
+ shutIn = true;
+ }
+
+ @Override
+ public void shutdownOutput() throws IOException
+ {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (!isConnected()) throw new SocketException("Socket is not connected");
+ if (isOutputShutdown()) throw new SocketException("Socket output is already shutdown");
+ sender.stop();
+ outputStream.close();
+ shutOut = true;
+ }
+
+ /**
+ * Returns the connection state of the socket.
+ *
+ * @return true if the socket successfuly connected to a server
+ * @since 1.4
+ */
+ @Override
+ public boolean isConnected() {
+ // Before 1.3 Sockets were always connected during creation
+ return connected;
+ }
+
+ @Override
+ public boolean isBound() {
+ return bound ;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public boolean isInputShutdown() {
+ return shutIn;
+ }
+
+ /**
+ * Returns whether the write-half of the socket connection is closed.
+ *
+ * @return true if the output of the socket has been shutdown
+ * @since 1.4
+ * @see #shutdownOutput
+ */
+ @Override
+ public boolean isOutputShutdown() {
+ return shutOut;
+ }
+
+ // some preliminary support for socket options.
+ private Object getOption(int optID) {
+ return Boolean.FALSE;
+ }
+
public UDTReceiver getReceiver() {
return receiver;
}
@@ -87,9 +509,9 @@
this.sender = sender;
}
- public void setActive(boolean active) {
- this.active = active;
- }
+// public void setActive(boolean active) {
+// this.active = active;
+// }
public boolean isActive() {
return active;
@@ -103,7 +525,8 @@
* get the input stream for reading from this socket
* @return
*/
- public synchronized UDTInputStream getInputStream()throws IOException{
+ @Override
+ public synchronized InputStream getInputStream()throws IOException{
if(inputStream==null){
inputStream=new UDTInputStream(this);
}
@@ -114,7 +537,8 @@
* get the output stream for writing to this socket
* @return
*/
- public synchronized UDTOutputStream getOutputStream(){
+ @Override
+ public synchronized OutputStream getOutputStream(){
if(outputStream==null){
outputStream=new UDTOutputStream(this);
}
@@ -126,7 +550,7 @@
}
/**
- * write single block of data without waiting for any acknowledgement
+ * write single block of data without waiting for any acknowledgment
* @param data
*/
protected void doWrite(byte[]data)throws IOException{
@@ -176,6 +600,8 @@
/**
* will block until the outstanding packets have really been sent out
* and acknowledged
+ *
+ * @throws InterruptedException
*/
protected void flush() throws InterruptedException{
if(!active)return;
@@ -200,14 +626,38 @@
flush();
}
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(400);
+ sb .append("UDTSocket: \n")
+ .append("Local address: ")
+ ...
[truncated message content] |
|
From: <pe...@us...> - 2011-08-05 01:14:34
|
Revision: 59
http://udt-java.svn.sourceforge.net/udt-java/?rev=59&view=rev
Author: pete_
Date: 2011-08-05 01:14:28 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Skunk experimental branch
Added Paths:
-----------
udt-java/skunk/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pe...@us...> - 2011-08-05 01:13:57
|
Revision: 58
http://udt-java.svn.sourceforge.net/udt-java/?rev=58&view=rev
Author: pete_
Date: 2011-08-05 01:13:51 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Skunk experimental branch
Removed Paths:
-------------
udt-java/skunk/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pe...@us...> - 2011-08-05 01:12:04
|
Revision: 57
http://udt-java.svn.sourceforge.net/udt-java/?rev=57&view=rev
Author: pete_
Date: 2011-08-05 01:11:58 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Skunk experimental branch
Added Paths:
-----------
udt-java/skunk/trunk/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pe...@us...> - 2011-08-05 01:09:47
|
Revision: 56
http://udt-java.svn.sourceforge.net/udt-java/?rev=56&view=rev
Author: pete_
Date: 2011-08-05 01:09:41 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Move skunk down in directory tree
Added Paths:
-----------
udt-java/skunk/
Removed Paths:
-------------
skunk/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <pe...@us...> - 2011-08-05 01:04:17
|
Revision: 55
http://udt-java.svn.sourceforge.net/udt-java/?rev=55&view=rev
Author: pete_
Date: 2011-08-05 01:04:11 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Skunk experimental branch
Added Paths:
-----------
skunk/
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-02-17 21:24:39
|
Revision: 54
http://udt-java.svn.sourceforge.net/udt-java/?rev=54&view=rev
Author: bschuller
Date: 2011-02-17 21:24:32 +0000 (Thu, 17 Feb 2011)
Log Message:
-----------
bit of refactoring of sender to avoid memory allocations for data and data packets
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/UDTSocket.java
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
Added Paths:
-----------
udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
udt-java/trunk/src/test/java/udt/sender/
udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -168,6 +168,7 @@
}
};
Thread t=UDTThreadFactory.get().newThread(receive);
+ t.setName("UDPEndpoint-"+t.getName());
t.setDaemon(true);
t.start();
logger.info("UDTEndpoint started.");
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -214,6 +214,8 @@
}
};
receiverThread=UDTThreadFactory.get().newThread(r);
+ String s=(session instanceof ServerSession)? "ServerSession": "ClientSession";
+ receiverThread.setName("UDTReceiver-"+s+"-"+receiverThread.getName());
receiverThread.start();
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -33,9 +33,8 @@
package udt;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -49,6 +48,7 @@
import udt.packets.DataPacket;
import udt.packets.KeepAlive;
import udt.packets.NegativeAcknowledgement;
+import udt.sender.FlowWindow;
import udt.sender.SenderLossList;
import udt.util.MeanThroughput;
import udt.util.MeanValue;
@@ -76,13 +76,12 @@
//senderLossList stores the sequence numbers of lost packets
//fed back by the receiver through NAK pakets
private final SenderLossList senderLossList;
-
+
//sendBuffer stores the sent data packets and their sequence numbers
- private final Map<Long,DataPacket>sendBuffer;
-
- //sendQueue contains the packets to send
- private final BlockingQueue<DataPacket>sendQueue;
-
+ private final Map<Long,byte[]>sendBuffer;
+
+ private final FlowWindow flowWindow;
+
//thread reading packets from send queue and sending them
private Thread senderThread;
@@ -117,15 +116,18 @@
private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
private final boolean storeStatistics;
-
+
+ private final int chunksize;
+
public UDTSender(UDTSession session,UDPEndPoint endpoint){
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
this.endpoint= endpoint;
this.session=session;
statistics=session.getStatistics();
senderLossList=new SenderLossList();
- sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2);
- sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true);
+ sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2);
+ chunksize=session.getDatagramSize()-24;//need space for the header;
+ flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize);
lastAckSequenceNumber=session.getInitialSequenceNumber();
currentSequenceNumber=session.getInitialSequenceNumber()-1;
waitForAckLatch.set(new CountDownLatch(1));
@@ -179,16 +181,14 @@
}
};
senderThread=UDTThreadFactory.get().newThread(r);
+ String s=(session instanceof ServerSession)? "ServerSession": "ClientSession";
+ senderThread.setName("UDTSender-"+s+"-"+senderThread.getName());
senderThread.start();
}
/**
* sends the given data packet, storing the relevant information
- *
- * @param data
- * @throws IOException
- * @throws InterruptedException
*/
private void send(DataPacket p)throws IOException{
synchronized(sendLock){
@@ -203,28 +203,63 @@
throughput.end();
throughput.begin();
}
- sendBuffer.put(p.getPacketSequenceNumber(), p);
+ sendBuffer.put(p.getPacketSequenceNumber(), p.getData());
unacknowledged.incrementAndGet();
}
statistics.incNumberOfSentDataPackets();
}
+ protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{
+ if(!started)start();
+ DataPacket packet=null;
+ do{
+ packet=flowWindow.getForProducer();
+ if(packet==null){
+ Thread.sleep(10);
+ }
+ }while(packet==null);//TODO check timeout...
+ try{
+ packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setSession(session);
+ packet.setDestinationID(session.getDestination().getSocketID());
+ int len=Math.min(bb.remaining(),chunksize);
+ byte[] data=packet.getData();
+ bb.get(data,0,len);
+ packet.setLength(len);
+ }finally{
+ flowWindow.produce();
+ }
+
+ }
+
/**
- * writes a data packet into the sendQueue, waiting at most for the specified time
+ * writes a data packet, waiting at most for the specified time
* if this is not possible due to a full send queue
*
- * @return <code>true</code>if the packet was added, <code>false</code> if the
- * packet could not be added because the queue was full
- * @param p
* @param timeout
* @param units
* @return
* @throws IOException
* @throws InterruptedException
*/
- protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{
+ protected void sendUdtPacket(byte[]data, int timeout, TimeUnit units)throws IOException, InterruptedException{
if(!started)start();
- return sendQueue.offer(p,timeout,units);
+ DataPacket packet=null;
+ do{
+ packet=flowWindow.getForProducer();
+ if(packet==null){
+ Thread.sleep(10);
+ // System.out.println("queue full: "+flowWindow);
+ }
+ }while(packet==null);
+ try{
+ packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setSession(session);
+ packet.setDestinationID(session.getDestination().getSocketID());
+ packet.setData(data);
+ }finally{
+ flowWindow.produce();
+ }
}
//receive a packet from server from the peer
@@ -268,6 +303,7 @@
for(long s=lastAckSequenceNumber;s<ackNumber;s++){
synchronized (sendLock) {
removed=sendBuffer.remove(s)!=null;
+ senderLossList.remove(s);
}
if(removed){
unacknowledged.decrementAndGet();
@@ -291,7 +327,7 @@
session.getCongestionControl().onLoss(nak.getDecodedLossInfo());
session.getSocket().getReceiver().resetEXPTimer();
statistics.incNumberOfNAKReceived();
-
+
if(logger.isLoggable(Level.FINER)){
logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, "
+"set send period to "+session.getCongestionControl().getSendInterval());
@@ -322,13 +358,11 @@
public void senderAlgorithm()throws InterruptedException, IOException{
while(!paused){
iterationStart=Util.getCurrentTime();
-
//if the sender's loss list is not empty
- if (!senderLossList.isEmpty()) {
- Long entry=senderLossList.getFirstEntry();
- handleResubmit(entry);
+ Long entry=senderLossList.getFirstEntry();
+ if(entry!=null){
+ handleRetransmit(entry);
}
-
else
{
//if the number of unacknowledged data packets does not exceed the congestion
@@ -336,9 +370,9 @@
int unAcknowledged=unacknowledged.get();
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
- && unAcknowledged<session.getFlowWindowSize()){
+ && unAcknowledged<session.getFlowWindowSize()){
//check for application data
- DataPacket dp=sendQueue.poll();
+ DataPacket dp=flowWindow.consumeData();
if(dp!=null){
send(dp);
largestSentSequenceNumber=dp.getPacketSequenceNumber();
@@ -374,15 +408,21 @@
}
/**
- * re-submits an entry from the sender loss list
+ * re-transmit an entry from the sender loss list
* @param entry
*/
- protected void handleResubmit(Long seqNumber){
+ protected void handleRetransmit(Long seqNumber){
try {
//retransmit the packet and remove it from the list
- DataPacket pktToRetransmit = sendBuffer.get(seqNumber);
- if(pktToRetransmit!=null){
- endpoint.doSend(pktToRetransmit);
+ byte[]data=sendBuffer.get(seqNumber);
+ if(data!=null){
+ //System.out.println("re-transmit "+data);
+ DataPacket packet=new DataPacket();
+ packet.setPacketSequenceNumber(seqNumber);
+ packet.setSession(session);
+ packet.setDestinationID(session.getDestination().getSocketID());
+ packet.setData(data);
+ endpoint.doSend(packet);
statistics.incNumberOfRetransmittedDataPackets();
}
}catch (Exception e) {
@@ -457,14 +497,14 @@
*/
public void waitForAck()throws InterruptedException{
waitForAckLatch.set(new CountDownLatch(1));
- waitForAckLatch.get().await(2, TimeUnit.MILLISECONDS);
+ waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS);
}
public void stop(){
stopped=true;
}
-
+
public void pause(){
startLatch=new CountDownLatch(1);
paused=true;
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -74,7 +74,7 @@
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=1024;
+ protected int flowWindowSize=1024*10;
/**
* remote UDT entity (address and socket ID)
Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -36,46 +36,41 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
-
-import udt.packets.DataPacket;
-
/**
* UDTSocket is analogous to a normal java.net.Socket, it provides input and
* output streams for the application
*
* TODO is it possible to actually extend java.net.Socket ?
*
- *
*/
public class UDTSocket {
-
+
//endpoint
private final UDPEndPoint endpoint;
-
+
private volatile boolean active;
-
- //processing received data
+
+ //processing received data
private UDTReceiver receiver;
private UDTSender sender;
-
+
private final UDTSession session;
private UDTInputStream inputStream;
private UDTOutputStream outputStream;
-
/**
- * @param host
- * @param port
- * @param endpoint
- * @throws SocketException,UnknownHostException
- */
+ * @param host
+ * @param port
+ * @param endpoint
+ * @throws SocketException,UnknownHostException
+ */
public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
this.endpoint=endpoint;
this.session=session;
this.receiver=new UDTReceiver(session,endpoint);
this.sender=new UDTSender(session,endpoint);
}
-
+
public UDTReceiver getReceiver() {
return receiver;
}
@@ -114,7 +109,7 @@
}
return inputStream;
}
-
+
/**
* get the output stream for writing to this socket
* @return
@@ -125,20 +120,20 @@
}
return outputStream;
}
-
+
public final UDTSession getSession(){
return session;
}
-
+
/**
* write single block of data without waiting for any acknowledgement
* @param data
*/
protected void doWrite(byte[]data)throws IOException{
doWrite(data, 0, data.length);
-
+
}
-
+
/**
* write the given data
* @param data - the data array
@@ -148,14 +143,14 @@
*/
protected void doWrite(byte[]data, int offset, int length)throws IOException{
try{
- doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+ doWrite(data, offset, length, 10, TimeUnit.MILLISECONDS);
}catch(InterruptedException ie){
IOException io=new IOException();
io.initCause(ie);
throw io;
}
}
-
+
/**
* write the given data, waiting at most for the specified time if the queue is full
* @param data
@@ -167,26 +162,17 @@
* @throws InterruptedException
*/
protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{
- int chunksize=session.getDatagramSize()-24;//need some bytes for the header
ByteBuffer bb=ByteBuffer.wrap(data,offset,length);
- long seqNo=0;
while(bb.remaining()>0){
- int len=Math.min(bb.remaining(),chunksize);
- byte[]chunk=new byte[len];
- bb.get(chunk);
- DataPacket packet=new DataPacket();
- seqNo=sender.getNextSequenceNumber();
- packet.setPacketSequenceNumber(seqNo);
- packet.setSession(session);
- packet.setDestinationID(session.getDestination().getSocketID());
- packet.setData(chunk);
- //put the packet into the send queue
- if(!sender.sendUdtPacket(packet, timeout, units)){
- throw new IOException("Queue full");
+ try{
+ sender.sendUdtPacket(bb, timeout, units);
+ }catch(Exception ex){
+ ex.printStackTrace();
}
}
if(length>0)active=true;
}
+
/**
* will block until the outstanding packets have really been sent out
* and acknowledged
@@ -207,13 +193,13 @@
//TODO need to check if we can pause the sender...
//sender.pause();
}
-
+
//writes and wait for ack
protected void doWriteBlocking(byte[]data)throws IOException, InterruptedException{
doWrite(data);
flush();
}
-
+
/**
* close the connection
* @throws IOException
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -44,12 +44,15 @@
private long destinationID;
private UDTSession session;
+
+ private int dataLength;
public DataPacket(){
}
/**
- * create a DataPacket
+ * create a DataPacket from the given raw data
+ *
* @param encodedData - network data
*/
public DataPacket(byte[] encodedData){
@@ -58,6 +61,7 @@
public DataPacket(byte[] encodedData, int length){
decode(encodedData,length);
+ dataLength=length;
}
void decode(byte[]encodedData,int length){
@@ -75,16 +79,16 @@
}
public double getLength(){
- return data.length;
+ return dataLength;
}
- /*
- * aplivation data
- * @param
- */
-
+ public void setLength(int length){
+ dataLength=length;
+ }
+
public void setData(byte[] data) {
this.data = data;
+ dataLength=data.length;
}
public long getPacketSequenceNumber() {
@@ -125,12 +129,12 @@
*/
public byte[] getEncoded(){
//header.length is 16
- byte[] result=new byte[16+data.length];
+ byte[] result=new byte[16+dataLength];
System.arraycopy(PacketUtil.encode(packetSequenceNumber), 0, result, 0, 4);
System.arraycopy(PacketUtil.encode(messageNumber), 0, result, 4, 4);
System.arraycopy(PacketUtil.encode(timeStamp), 0, result, 8, 4);
System.arraycopy(PacketUtil.encode(destinationID), 0, result, 12, 4);
- System.arraycopy(data, 0, result, 16, data.length);
+ System.arraycopy(data, 0, result, 16, dataLength);
return result;
}
Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -39,12 +39,17 @@
*/
public class PacketHistoryWindow extends CircularArray<Long>{
+ private final long[]intervals;
+ private final int num;
+
/**
* create a new PacketHistoryWindow of the given size
* @param size
*/
public PacketHistoryWindow(int size){
super(size);
+ num=max-1;
+ intervals=new long[num];
}
/**
@@ -54,12 +59,11 @@
*/
public long getPacketArrivalSpeed(){
if(!haveOverflow)return 0;
- int num=max-1;
+
double AI;
double medianPacketArrivalSpeed;
double total=0;
int count=0;
- long[]intervals=new long[num];
int pos=position-1;
if(pos<0)pos=num;
do{
Added: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java (rev 0)
+++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -0,0 +1,139 @@
+package udt.sender;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import udt.packets.DataPacket;
+
+/**
+ *
+ * holds a fixed number of {@link DataPacket} instances which are sent out.
+ *
+ * it is assumed that a single thread stores new data, and another single thread
+ * reads/removes data
+ *
+ * @author schuller
+ */
+public class FlowWindow {
+
+ private final DataPacket[]packets;
+
+ private final int length;
+
+ private volatile boolean isEmpty=true;
+
+ private volatile boolean isFull=false;
+
+ private volatile int validEntries=0;
+
+ private volatile boolean isCheckout=false;
+
+ private volatile int writePos=0;
+
+ private volatile int readPos=-1;
+
+ private volatile int consumed=0;
+
+ private volatile int produced=0;
+
+ private final ReentrantLock lock;
+
+ /**
+ * @param size - flow window size
+ * @param chunksize - data chunk size
+ */
+ public FlowWindow(int size, int chunksize){
+ this.length=size;
+ packets=new DataPacket[length];
+ for(int i=0;i<packets.length;i++){
+ packets[i]=new DataPacket();
+ packets[i].setData(new byte[chunksize]);
+ }
+ lock=new ReentrantLock(true);
+ }
+
+ /**
+ * get a data packet for updating with new data
+ *
+ * @return <code>null</code> if flow window is full
+ */
+ public DataPacket getForProducer(){
+ lock.lock();
+ try{
+ if(isFull){
+ return null;
+ }
+ if(isCheckout)throw new IllegalStateException();
+ isCheckout=true;
+ DataPacket p=packets[writePos];
+ return p;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+ public void produce(){
+ lock.lock();
+ try{
+ isCheckout=false;
+ writePos++;
+ if(writePos==length)writePos=0;
+ validEntries++;
+ isFull=validEntries==length-1;
+ isEmpty=false;
+ produced++;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+
+ public DataPacket consumeData(){
+ if(isEmpty){
+ return null;
+ }
+ lock.lock();
+ try{
+ readPos++;
+ DataPacket p=packets[readPos];
+ if(readPos==length-1)readPos=-1;
+ validEntries--;
+ isEmpty=validEntries==0;
+ isFull=false;
+ consumed++;
+ return p;
+ }finally{
+ lock.unlock();
+ }
+ }
+
+ boolean isEmpty(){
+ return isEmpty;
+ }
+
+ /**
+ * check if another entry can be added
+ * @return
+ */
+ public boolean isFull(){
+ return isFull;
+ }
+
+ int readPos(){
+ return readPos;
+ }
+
+ int writePos(){
+ return writePos;
+ }
+
+ int consumed(){
+ return consumed;
+ }
+ public String toString(){
+ StringBuilder sb=new StringBuilder();
+ sb.append("FlowWindow size=").append(length);
+ sb.append(" full=").append(isFull).append(" empty=").append(isEmpty);
+ sb.append(" consumed=").append(consumed).append(" produced=").append(produced);
+ return sb.toString();
+ }
+}
Property changes on: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -33,8 +33,6 @@
package udt.sender;
import java.util.LinkedList;
-import udt.util.MeanValue;
-
/**
* stores the sequence number of the lost packets in increasing order
*/
@@ -57,14 +55,19 @@
backingList.add(i,obj);
return;
}
- else if(obj==entry)return;
+ else if(obj.equals(entry))return;
}
backingList.add(obj);
}
}
+ public void remove(Long obj){
+ synchronized (backingList) {
+ backingList.remove(obj);
+ }
+ }
/**
- * retrieves the loss list entry with the lowest sequence number
+ * retrieves the loss list entry with the lowest sequence number, or <code>null</code> if loss list is empty
*/
public Long getFirstEntry(){
synchronized(backingList){
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -109,7 +109,6 @@
while(true)Thread.sleep(10000);
}
-
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -188,7 +188,7 @@
sb.append("Duplicate data packets: ").append(getNumberOfDuplicateDataPackets()).append("\n");
sb.append("ACK received: ").append(getNumberOfACKReceived()).append("\n");
sb.append("NAK received: ").append(getNumberOfNAKReceived()).append("\n");
- sb.append("Retransmitted data: ").append(getNumberOfNAKReceived()).append("\n");
+ sb.append("Retransmitted data: ").append(getNumberOfRetransmittedDataPackets()).append("\n");
sb.append("NAK sent: ").append(getNumberOfNAKSent()).append("\n");
sb.append("ACK sent: ").append(getNumberOfACKSent()).append("\n");
if(roundTripTime>0){
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=300;
+ int num_packets=100;
//how large is a single packet
int size=1*1024*1024;
@@ -55,7 +55,7 @@
new Random().nextBytes(data);
MessageDigest digest=MessageDigest.getInstance("MD5");
- while(!serverRunning)Thread.sleep(100);
+ while(!serverStarted)Thread.sleep(100);
long start=System.currentTimeMillis();
System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each");
long end=0;
@@ -101,6 +101,7 @@
long total=0;
volatile boolean serverRunning=true;
+ volatile boolean serverStarted=false;
volatile String md5_received=null;
@@ -110,6 +111,7 @@
Runnable serverProcess=new Runnable(){
public void run(){
+
try{
Boolean devNull=Boolean.getBoolean("udt.dev.null");
if(devNull){
@@ -118,6 +120,7 @@
MessageDigest md5=MessageDigest.getInstance("MD5");
long start=System.currentTimeMillis();
UDTSocket s=serverSocket.accept();
+ serverStarted=true;
assertNotNull(s);
UDTInputStream is=s.getInputStream();
byte[]buf=new byte[READ_BUFFERSIZE];
Added: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
===================================================================
--- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java (rev 0)
+++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-02-17 21:24:32 UTC (rev 54)
@@ -0,0 +1,151 @@
+package udt.sender;
+
+import java.util.concurrent.TimeoutException;
+
+import junit.framework.TestCase;
+import udt.packets.DataPacket;
+
+public class TestFlowWindow extends TestCase {
+
+ public void testFillWindow()throws InterruptedException, TimeoutException{
+ FlowWindow fw=new FlowWindow(3, 128);
+ DataPacket p1=fw.getForProducer();
+ assertNotNull(p1);
+ fw.produce();
+ DataPacket p2=fw.getForProducer();
+ assertNotNull(p2);
+ fw.produce();
+ assertFalse(p1==p2);
+ DataPacket p3=fw.getForProducer();
+ assertNotNull(p3);
+ assertFalse(p1==p3);
+ assertFalse(p2==p3);
+ fw.produce();
+ assertTrue(fw.isFull());
+
+ DataPacket no=fw.getForProducer();
+ assertNull("Window should be full",no);
+
+ DataPacket c1=fw.consumeData();
+ //must be p1
+ assertTrue(c1==p1);
+ DataPacket c2=fw.consumeData();
+ //must be p2
+ assertTrue(c2==p2);
+ DataPacket c3=fw.consumeData();
+ //must be p3
+ assertTrue(c3==p3);
+ assertTrue(fw.isEmpty());
+ }
+
+ public void testOverflow()throws InterruptedException, TimeoutException{
+ FlowWindow fw=new FlowWindow(3, 64);
+ DataPacket p1=fw.getForProducer();
+ assertNotNull(p1);
+ fw.produce();
+ DataPacket p2=fw.getForProducer();
+ assertNotNull(p2);
+ fw.produce();
+ assertFalse(p1==p2);
+ DataPacket p3=fw.getForProducer();
+ assertNotNull(p3);
+ assertFalse(p1==p3);
+ assertFalse(p2==p3);
+ fw.produce();
+ assertTrue(fw.isFull());
+
+ //read one
+ DataPacket c1=fw.consumeData();
+ //must be p1
+ assertTrue(c1==p1);
+ assertFalse(fw.isFull());
+
+ //now a slot for writing should be free again
+ DataPacket p4=fw.getForProducer();
+ assertNotNull(p4);
+ fw.produce();
+ //which is again p1
+ assertTrue(p4==p1);
+
+ }
+
+ private volatile boolean fail=false;
+
+ public void testConcurrentReadWrite()throws InterruptedException{
+ final FlowWindow fw=new FlowWindow(20, 64);
+ Thread reader=new Thread(new Runnable(){
+ public void run(){
+ doRead(fw);
+ }
+ });
+ reader.setName("reader");
+ Thread writer=new Thread(new Runnable(){
+ public void run(){
+ doWrite(fw);
+ }
+ });
+ writer.setName("writer");
+
+ writer.start();
+ reader.start();
+
+ int c=0;
+ while(read && write && c<10){
+ Thread.sleep(1000);
+ c++;
+ }
+ assertFalse("An error occured in reader or writer",fail);
+
+ }
+
+ volatile boolean read=true;
+ volatile boolean write=true;
+ int N=100000;
+
+ private void doRead(final FlowWindow fw){
+ System.out.println("Starting reader...");
+ try{
+ for(int i=0;i<N;i++){
+ DataPacket p=null;
+ while( (p=fw.consumeData())==null){
+ Thread.sleep(1);
+ }
+ synchronized (p) {
+ assertEquals(i,p.getMessageNumber());
+ }
+ }
+ }catch(Throwable ex){
+ ex.printStackTrace();
+ fail=true;
+ }
+ System.out.println("Exiting reader...");
+ read=false;
+ }
+
+ private void doWrite(final FlowWindow fw){
+ System.out.println("Starting writer...");
+ DataPacket p=null;
+ try{
+ for(int i=0;i<N;i++){
+ p=null;
+ do{
+ p=fw.getForProducer();
+ if(p!=null){
+ synchronized(p){
+ p.setData(("test"+i).getBytes());
+ p.setMessageNumber(i);
+ fw.produce();
+ }
+ }
+ }while(p==null);
+ }
+ }catch(Exception ex){
+ ex.printStackTrace();
+ System.out.println("ERROR****");
+ fail=true;
+ }
+ System.out.println("Exiting writer...");
+ write=false;
+ }
+
+}
Property changes on: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2011-01-06 16:13:38
|
Revision: 53
http://udt-java.svn.sourceforge.net/udt-java/?rev=53&view=rev
Author: bschuller
Date: 2011-01-06 16:13:32 +0000 (Thu, 06 Jan 2011)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/main/java/udt/util/MeanValue.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/performance/UDPTest.java
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -117,12 +117,17 @@
if(localPort>0)this.port = localPort;
else port=dgSocket.getLocalPort();
+ configureSocket();
+ }
+
+ protected void configureSocket()throws SocketException{
//set a time out to avoid blocking in doReceive()
dgSocket.setSoTimeout(100000);
//buffer size
dgSocket.setReceiveBufferSize(128*1024);
+ dgSocket.setReuseAddress(false);
}
-
+
/**
* bind to the default network interface on the machine
*
@@ -237,8 +242,6 @@
private long lastDestID=-1;
private UDTSession lastSession;
- //MeanValue v=new MeanValue("receiver processing ",true, 256);
-
private int n=0;
private final Object lock=new Object();
@@ -247,13 +250,10 @@
while(!stopped){
try{
try{
- //v.end();
//will block until a packet is received or timeout has expired
dgSocket.receive(dp);
- //v.begin();
-
Destination peer=new Destination(dp.getAddress(), dp.getPort());
int l=dp.getLength();
UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -93,14 +93,19 @@
/**
* sends the given data asynchronously
*
- * @param data
+ * @param data - the data to send
* @throws IOException
- * @throws InterruptedException
*/
- public void send(byte[]data)throws IOException, InterruptedException{
+ public void send(byte[]data)throws IOException{
clientSession.getSocket().doWrite(data);
}
+ /**
+ * sends the given data and waits for acknowledgement
+ * @param data - the data to send
+ * @throws IOException
+ * @throws InterruptedException if interrupted while waiting for ack
+ */
public void sendBlocking(byte[]data)throws IOException, InterruptedException{
clientSession.getSocket().doWriteBlocking(data);
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -183,13 +183,13 @@
private MeanValue dataProcessTime;
private void initMetrics(){
if(!storeStatistics)return;
- dgReceiveInterval=new MeanValue("UDT receive interval");
+ dgReceiveInterval=new MeanValue("RECEIVER: UDT receive interval");
statistics.addMetric(dgReceiveInterval);
- dataPacketInterval=new MeanValue("Data packet interval");
+ dataPacketInterval=new MeanValue("RECEIVER: Data packet interval");
statistics.addMetric(dataPacketInterval);
- processTime=new MeanValue("UDT packet process time");
+ processTime=new MeanValue("RECEIVER: UDT packet process time");
statistics.addMetric(processTime);
- dataProcessTime=new MeanValue("Data packet process time");
+ dataProcessTime=new MeanValue("RECEIVER: Data packet process time");
statistics.addMetric(dataProcessTime);
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -125,7 +125,7 @@
statistics=session.getStatistics();
senderLossList=new SenderLossList();
sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2);
- sendQueue = new ArrayBlockingQueue<DataPacket>(1000);
+ sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true);
lastAckSequenceNumber=session.getInitialSequenceNumber();
currentSequenceNumber=session.getInitialSequenceNumber()-1;
waitForAckLatch.set(new CountDownLatch(1));
@@ -140,11 +140,11 @@
private MeanThroughput throughput;
private void initMetrics(){
if(!storeStatistics)return;
- dgSendTime=new MeanValue("Datagram send time");
+ dgSendTime=new MeanValue("SENDER: Datagram send time");
statistics.addMetric(dgSendTime);
- dgSendInterval=new MeanValue("Datagram send interval");
+ dgSendInterval=new MeanValue("SENDER: Datagram send interval");
statistics.addMetric(dgSendInterval);
- throughput=new MeanThroughput("Throughput", session.getDatagramSize());
+ throughput=new MeanThroughput("SENDER: Throughput", session.getDatagramSize());
statistics.addMetric(throughput);
}
@@ -338,7 +338,7 @@
if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
&& unAcknowledged<session.getFlowWindowSize()){
//check for application data
- DataPacket dp=sendQueue.poll(Util.SYN,TimeUnit.MICROSECONDS);
+ DataPacket dp=sendQueue.poll();
if(dp!=null){
send(dp);
largestSentSequenceNumber=dp.getPacketSequenceNumber();
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -33,6 +33,8 @@
package udt.sender;
import java.util.LinkedList;
+import udt.util.MeanValue;
+
/**
* stores the sequence number of the lost packets in increasing order
*/
@@ -49,15 +51,15 @@
public void insert(Long obj){
synchronized (backingList) {
- if(!backingList.contains(obj)){
- for(int i=0;i<backingList.size();i++){
- if(obj<backingList.get(i)){
- backingList.add(i,obj);
- return;
- }
+ for(int i=0;i<backingList.size();i++){
+ Long entry=backingList.get(i);
+ if(obj<entry){
+ backingList.add(i,obj);
+ return;
}
- backingList.add(obj);
+ else if(obj==entry)return;
}
+ backingList.add(obj);
}
}
@@ -69,7 +71,7 @@
return backingList.poll();
}
}
-
+
public boolean isEmpty(){
return backingList.isEmpty();
}
Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -9,6 +9,8 @@
public class MeanValue {
private double mean=0;
+ private double max=0;
+ private double min=0;
private int n=0;
@@ -43,9 +45,15 @@
public void addValue(double value){
mean=(mean*n+value)/(n+1);
n++;
+ max=Math.max(max, value);
+ min=Math.min(max, value);
+
if(verbose && n % nValue == 0){
- if(msg!=null)System.out.println(msg+" "+getFormattedMean());
- else System.out.println(name+getFormattedMean());
+ if(msg!=null)System.out.println(msg+" "+get());
+ else System.out.println(name+" "+get());
+
+ max=0;
+ min=0;
}
}
@@ -57,6 +65,10 @@
return format.format(getMean());
}
+ public String get(){
+ return format.format(getMean())+" max="+format.format(max)+" min="+format.format(min);
+ }
+
public void clear(){
mean=0;
n=0;
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -44,7 +44,6 @@
lock=new ReentrantLock(false);
notEmpty=lock.newCondition();
highestReadSequenceNumber=SequenceNumber.decrement(initialSequenceNumber);
- System.out.println("SIZE: "+size);
}
public boolean offer(AppData data){
@@ -121,14 +120,6 @@
}
else return null;
}
- // else{
- // System.out.println("empty HEAD at pos="+readPosition);
- // try{
- // Thread.sleep(1000);
- // Thread.yield();
- // }catch(InterruptedException e){};
- // }
-
return r;
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -104,6 +104,12 @@
}
long size=decode(sizeInfo, 0);
+ Boolean devNull=Boolean.getBoolean("udt.dev.null");
+ if(devNull){
+ while(true)Thread.sleep(10000);
+ }
+
+
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=200;
+ int num_packets=300;
//how large is a single packet
int size=1*1024*1024;
@@ -32,6 +32,8 @@
public void test1()throws Exception{
Logger.getLogger("udt").setLevel(Level.INFO);
+// System.setProperty("udt.receiver.storeStatistics","true");
+// System.setProperty("udt.sender.storeStatistics","true");
UDTReceiver.dropRate=0;
TIMEOUT=Integer.MAX_VALUE;
doTest();
@@ -109,6 +111,10 @@
Runnable serverProcess=new Runnable(){
public void run(){
try{
+ Boolean devNull=Boolean.getBoolean("udt.dev.null");
+ if(devNull){
+ while(true)Thread.sleep(10000);
+ }
MessageDigest md5=MessageDigest.getInstance("MD5");
long start=System.currentTimeMillis();
UDTSocket s=serverSocket.accept();
Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-11-11 21:56:26 UTC (rev 52)
+++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-01-06 16:13:32 UTC (rev 53)
@@ -23,8 +23,10 @@
public void test1()throws Exception{
runServer();
runThirdThread();
+
//client socket
DatagramSocket s=new DatagramSocket(12345);
+
//generate a test array with random content
N=num_packets*packetSize;
byte[]data=new byte[packetSize];
@@ -34,32 +36,29 @@
dp.setAddress(InetAddress.getByName("localhost"));
dp.setPort(65321);
System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes");
- MeanValue v=new MeanValue("Datagram send time",false);
- MeanValue v2=new MeanValue("Datagram send interval",false);
- MeanValue v3=new MeanValue("Encoding time",false);
+ MeanValue dgSendTime=new MeanValue("Datagram send time",false);
+ MeanValue dgSendInterval=new MeanValue("Datagram send interval",false);
for(int i=0;i<num_packets;i++){
DataPacket p=new DataPacket();
p.setData(data);
- v3.begin();
dp.setData(p.getEncoded());
- v3.end();
- v2.end();
- v.begin();
+ dgSendInterval.end();
+ dgSendTime.begin();
s.send(dp);
- v.end();
- v2.begin();
+ dgSendTime.end();
+ dgSendInterval.begin();
}
System.out.println("Finished sending.");
while(serverRunning)Thread.sleep(10);
System.out.println("Server stopped.");
long end=System.currentTimeMillis();
System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms");
- System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec");
+ float rate=N/1000/(end-start);
+ System.out.println("Rate "+rate+" Mbytes/sec "+(rate*8)+ " Mbit/sec");
System.out.println("Rate "+num_packets+" packets/sec");
- System.out.println("Mean send time "+v.getFormattedMean()+" microsec");
- System.out.println("Mean send interval "+v2.getFormattedMean()+" microsec");
- System.out.println("Datapacket encoding time "+v3.getFormattedMean()+" microsec");
+ System.out.println("Mean send time "+dgSendTime.get());
+ System.out.println("Mean send interval "+dgSendInterval.get());
System.out.println("Server received: "+total);
}
@@ -79,6 +78,7 @@
while(true){
serverSocket.receive(dp);
handoff.offer(dp);
+ total+=dp.getLength();
}
}
catch(Exception e){
@@ -117,5 +117,5 @@
t.start();
}
-
+
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-11-11 23:22:55
|
Revision: 52
http://udt-java.svn.sourceforge.net/udt-java/?rev=52&view=rev
Author: bschuller
Date: 2010-11-11 21:56:26 +0000 (Thu, 11 Nov 2010)
Log Message:
-----------
fix sender loss list (entries were not ordered)
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
udt-java/trunk/src/test/java/udt/TestList.java
Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java
===================================================================
--- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-09-23 09:09:56 UTC (rev 51)
+++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-11-11 21:56:26 UTC (rev 52)
@@ -51,7 +51,7 @@
synchronized (backingList) {
if(!backingList.contains(obj)){
for(int i=0;i<backingList.size();i++){
- if(obj<backingList.getFirst()){
+ if(obj<backingList.get(i)){
backingList.add(i,obj);
return;
}
Modified: udt-java/trunk/src/test/java/udt/TestList.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestList.java 2010-09-23 09:09:56 UTC (rev 51)
+++ udt-java/trunk/src/test/java/udt/TestList.java 2010-11-11 21:56:26 UTC (rev 52)
@@ -88,6 +88,10 @@
assertEquals(3,l.size());
Long oldest=l.getFirstEntry();
assertEquals(C,oldest);
+ oldest=l.getFirstEntry();
+ assertEquals(A,oldest);
+ oldest=l.getFirstEntry();
+ assertEquals(B,oldest);
}
public void testReceiverInputQueue(){
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-09-23 09:10:02
|
Revision: 51
http://udt-java.svn.sourceforge.net/udt-java/?rev=51&view=rev
Author: bschuller
Date: 2010-09-23 09:09:56 +0000 (Thu, 23 Sep 2010)
Log Message:
-----------
update version number
Modified Paths:
--------------
udt-java/trunk/pom.xml
Modified: udt-java/trunk/pom.xml
===================================================================
--- udt-java/trunk/pom.xml 2010-09-17 11:04:39 UTC (rev 50)
+++ udt-java/trunk/pom.xml 2010-09-23 09:09:56 UTC (rev 51)
@@ -5,7 +5,7 @@
<artifactId>udt-java</artifactId>
<packaging>jar</packaging>
<name>UDT Java implementation</name>
- <version>0.5-SNAPSHOT</version>
+ <version>0.6-SNAPSHOT</version>
<url>http://sourceforge.net/projects/udt-java</url>
<developers>
<developer>
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <bsc...@us...> - 2010-09-17 11:04:46
|
Revision: 50
http://udt-java.svn.sourceforge.net/udt-java/?rev=50&view=rev
Author: bschuller
Date: 2010-09-17 11:04:39 +0000 (Fri, 17 Sep 2010)
Log Message:
-----------
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/Util.java
udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -386,14 +386,14 @@
protected void onDataPacketReceived(DataPacket dp)throws IOException{
long currentSequenceNumber = dp.getPacketSequenceNumber();
- //check whether to drop this packet
+ //for TESTING : check whether to drop this packet
// n++;
// //if(dropRate>0 && n % dropRate == 0){
-// if(n==666){
-// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
-// return;
-// }
-//
+// if(n % 1111 == 0){
+// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
+// return;
+// }
+// //}
boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
if(!OK){
//need to drop packet...
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -61,6 +61,7 @@
int insert=offset% size;
buffer[insert]=data;
numValidChunks.incrementAndGet();
+ notEmpty.signal();
return true;
}finally{
lock.unlock();
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -32,8 +32,10 @@
package udt.util;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.text.NumberFormat;
@@ -101,22 +103,16 @@
total+=r;
}
long size=decode(sizeInfo, 0);
- if(verbose){
- StringBuilder sb=new StringBuilder();
- for(int i=0;i<sizeInfo.length;i++){
- sb.append(Integer.toString(sizeInfo[i]));
- sb.append(" ");
- }
- System.out.println("[ReceiveFile] Size info: "+sb.toString());
- }
+
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
+ OutputStream os=new BufferedOutputStream(fos,1024*1024);
try{
System.out.println("[ReceiveFile] Reading <"+size+"> bytes.");
long start = System.currentTimeMillis();
//and read the file data
- Util.copy(in, fos, size, false);
+ Util.copy(in, os, size, false);
long end = System.currentTimeMillis();
double rate=1000.0*size/1024/1024/(end-start);
System.out.println("[ReceiveFile] Rate: "+format.format(rate)+" MBytes/sec. "
Modified: udt-java/trunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -134,6 +134,7 @@
c=source.read(buf);
if(c<0)break;
read+=c;
+ //System.out.println("writing <"+c+"> bytes");
target.write(buf, 0, c);
if(flush)target.flush();
if(read>=size && size>-1)break;
Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java
===================================================================
--- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -21,7 +21,7 @@
boolean running=false;
//how many
- int num_packets=500;
+ int num_packets=200;
//how large is a single packet
int size=1*1024*1024;
Modified: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49)
+++ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50)
@@ -16,79 +16,79 @@
byte[]test1="test1".getBytes();
byte[]test2="test2".getBytes();
byte[]test3="test3".getBytes();
-
+
b.offer(new AppData(1l,test1));
b.offer(new AppData(2l,test2));
b.offer(new AppData(3l,test3));
-
+
AppData a=b.poll();
assertEquals(1l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(2l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(3l,a.getSequenceNumber());
-
+
assertNull(b.poll());
}
-
+
public void testOutOfOrder(){
ReceiveBuffer b=new ReceiveBuffer(16,1);
byte[]test1="test1".getBytes();
byte[]test2="test2".getBytes();
byte[]test3="test3".getBytes();
-
+
b.offer(new AppData(3l,test3));
b.offer(new AppData(2l,test2));
b.offer(new AppData(1l,test1));
-
+
AppData a=b.poll();
assertEquals(1l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(2l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(3l,a.getSequenceNumber());
-
+
assertNull(b.poll());
}
-
+
public void testInterleaved(){
ReceiveBuffer b=new ReceiveBuffer(16,1);
byte[]test1="test1".getBytes();
byte[]test2="test2".getBytes();
byte[]test3="test3".getBytes();
-
+
b.offer(new AppData(3l,test3));
-
+
b.offer(new AppData(1l,test1));
-
+
AppData a=b.poll();
assertEquals(1l,a.getSequenceNumber());
-
+
assertNull(b.poll());
-
+
b.offer(new AppData(2l,test2));
-
+
a=b.poll();
assertEquals(2l,a.getSequenceNumber());
-
+
a=b.poll();
assertEquals(3l,a.getSequenceNumber());
}
-
+
public void testOverflow(){
ReceiveBuffer b=new ReceiveBuffer(4,1);
-
+
for(int i=0; i<3; i++){
b.offer(new AppData(i+1,"test".getBytes()));
}
for(int i=0; i<3; i++){
assertEquals(i+1, b.poll().getSequenceNumber());
}
-
+
for(int i=0; i<3; i++){
b.offer(new AppData(i+4,"test".getBytes()));
}
@@ -96,13 +96,13 @@
assertEquals(i+4, b.poll().getSequenceNumber());
}
}
-
-
+
+
public void testTimedPoll()throws Exception{
final ReceiveBuffer b=new ReceiveBuffer(4,1);
-
+
Runnable write=new Runnable(){
-
+
public void run(){
try{
for(int i=0; i<5; i++){
@@ -115,7 +115,7 @@
}
}
};
-
+
Callable<String> reader=new Callable<String>(){
public String call() throws Exception {
for(int i=0; i<5; i++){
@@ -131,12 +131,60 @@
return "OK.";
}
};
+
+ ScheduledExecutorService es=Executors.newScheduledThreadPool(2);
+ es.execute(write);
+ Future<String>res=es.submit(reader);
+ res.get();
+ es.shutdownNow();
+ }
+
+
+ volatile boolean poll=false;
+
+ public void testTimedPoll2()throws Exception{
+ final ReceiveBuffer b=new ReceiveBuffer(4,1);
+ Runnable write=new Runnable(){
+
+ public void run(){
+ try{
+ Thread.sleep(2979);
+ System.out.println("PUT");
+ while(!poll)Thread.sleep(10);
+ b.offer(new AppData(1,"test".getBytes()));
+ System.out.println("... PUT OK");
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail();
+ }
+ }
+ };
+
+ Callable<String> reader=new Callable<String>(){
+ public String call() throws Exception {
+ AppData r=null;
+ do{
+ try{
+ poll=true;
+ System.out.println("POLL");
+ r=b.poll(1000, TimeUnit.MILLISECONDS);
+ poll=false;
+ if(r!=null)System.out.println("... POLL OK");
+ else System.out.println("... nothing.");
+ }catch(InterruptedException ie){
+ ie.printStackTrace();
+ }
+ }while(r==null);
+ return "OK.";
+ }
+ };
+
ScheduledExecutorService es=Executors.newScheduledThreadPool(2);
es.execute(write);
Future<String>res=es.submit(reader);
res.get();
es.shutdownNow();
}
-
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|