[Udt-java-commits] SF.net SVN: udt-java:[74] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2012-05-25 08:03:15
|
Revision: 74
http://udt-java.svn.sourceforge.net/udt-java/?rev=74&view=rev
Author: bschuller
Date: 2012-05-25 08:03:03 +0000 (Fri, 25 May 2012)
Log Message:
-----------
attempt to switch to new UDT-C++ style of connection handshake. DOES NOT WORK yet
Modified Paths:
--------------
udt-java/trunk/pom.xml
udt-java/trunk/src/main/java/udt/ClientSession.java
udt-java/trunk/src/main/java/udt/ServerSession.java
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTInputStream.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/UDTSender.java
udt-java/trunk/src/main/java/udt/UDTServerSocket.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/trunk/src/main/java/udt/util/SequenceNumber.java
udt-java/trunk/src/test/java/echo/EchoServer.java
udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java
Modified: udt-java/trunk/pom.xml
===================================================================
--- udt-java/trunk/pom.xml 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/pom.xml 2012-05-25 08:03:03 UTC (rev 74)
@@ -5,7 +5,7 @@
<artifactId>udt-java</artifactId>
<packaging>jar</packaging>
<name>UDT Java implementation</name>
- <version>0.6-SNAPSHOT</version>
+ <version>0.7-SNAPSHOT</version>
<url>http://sourceforge.net/projects/udt-java</url>
<developers>
<developer>
Modified: udt-java/trunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ClientSession.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/ClientSession.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -52,6 +52,8 @@
private UDPEndPoint endPoint;
+ long initialSequenceNo=SequenceNumber.random();
+
public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
@@ -60,7 +62,7 @@
/**
* send connection handshake until a reply from server is received
- * TODO check for timeout
+
* @throws InterruptedException
* @throws IOException
*/
@@ -68,11 +70,21 @@
public void connect() throws InterruptedException,IOException{
int n=0;
while(getState()!=ready){
- sendHandShake();
if(getState()==invalid)throw new IOException("Can't connect!");
- n++;
- if(getState()!=ready)Thread.sleep(500);
+ if(getState()<=handshaking){
+ setState(handshaking);
+ sendInitialHandShake();
+ }
+ else if(getState()==handshaking+1){
+ sendSecondHandshake();
+ }
+
+ if(getState()==invalid)throw new IOException("Can't connect!");
+ if(n++ > 10)throw new IOException("Could not connect to server within the timeout.");
+
+ Thread.sleep(500);
}
+ Thread.sleep(1000);
cc.init();
logger.info("Connected, "+n+" handshake packets sent");
}
@@ -82,38 +94,10 @@
lastPacket=packet;
- if (packet instanceof ConnectionHandshake) {
+ if (packet.isConnectionHandshake()) {
ConnectionHandshake hs=(ConnectionHandshake)packet;
-
- logger.info("Received connection handshake from "+peer+"\n"+hs);
-
- if (getState()!=ready) {
- if(hs.getConnectionType()==1){
- try{
- //TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
- destination.setSocketID(peerSocketID);
- sendConfirmation(hs);
- }catch(Exception ex){
- logger.log(Level.WARNING,"Error creating socket",ex);
- setState(invalid);
- }
- return;
- }
- else{
- try{
- //TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
- destination.setSocketID(peerSocketID);
- setState(ready);
- socket=new UDTSocket(endPoint,this);
- }catch(Exception ex){
- logger.log(Level.WARNING,"Error creating socket",ex);
- setState(invalid);
- }
- return;
- }
- }
+ handleConnectionHandshake(hs,peer);
+ return;
}
if(getState() == ready) {
@@ -140,9 +124,43 @@
}
}
+ protected void handleConnectionHandshake(ConnectionHandshake hs, Destination peer){
- //handshake for connect
- protected void sendHandShake()throws IOException{
+ if (getState()==handshaking) {
+ logger.info("Received initial handshake response from "+peer+"\n"+hs);
+ if(hs.getConnectionType()==ConnectionHandshake.CONNECTION_SERVER_ACK){
+ try{
+ //TODO validate parameters sent by peer
+ long peerSocketID=hs.getSocketID();
+ sessionCookie=hs.getCookie();
+ destination.setSocketID(peerSocketID);
+ setState(handshaking+1);
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"Error creating socket",ex);
+ setState(invalid);
+ }
+ return;
+ }
+ else{
+ logger.info("Unexpected type of handshake packet received");
+ setState(invalid);
+ }
+ }
+ else if(getState()==handshaking+1){
+ try{
+ logger.info("Received confirmation handshake response from "+peer+"\n"+hs);
+ //TODO validate parameters sent by peer
+ setState(ready);
+ socket=new UDTSocket(endPoint,this);
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"Error creating socket",ex);
+ setState(invalid);
+ }
+ }
+ }
+
+ //initial handshake for connect
+ protected void sendInitialHandShake()throws IOException{
ConnectionHandshake handshake = new ConnectionHandshake();
handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
@@ -153,20 +171,24 @@
handshake.setSocketID(mySocketID);
handshake.setMaxFlowWndSize(flowWindowSize);
handshake.setSession(this);
+ handshake.setAddress(endPoint.getLocalAddress());
logger.info("Sending "+handshake);
endPoint.doSend(handshake);
}
//2nd handshake for connect
- protected void sendConfirmation(ConnectionHandshake hs)throws IOException{
+ protected void sendSecondHandshake()throws IOException{
ConnectionHandshake handshake = new ConnectionHandshake();
- handshake.setConnectionType(-1);
+ handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
- handshake.setInitialSeqNo(hs.getInitialSeqNo());
- handshake.setPacketSize(hs.getPacketSize());
+ handshake.setInitialSeqNo(initialSequenceNo);
+ handshake.setPacketSize(getDatagramSize());
handshake.setSocketID(mySocketID);
handshake.setMaxFlowWndSize(flowWindowSize);
handshake.setSession(this);
+ handshake.setCookie(sessionCookie);
+ handshake.setAddress(endPoint.getLocalAddress());
+ handshake.setDestinationID(getDestination().getSocketID());
logger.info("Sending confirmation "+handshake);
endPoint.doSend(handshake);
}
Modified: udt-java/trunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ServerSession.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/ServerSession.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -33,7 +33,6 @@
package udt;
import java.io.IOException;
-import java.net.DatagramPacket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.logging.Level;
@@ -43,6 +42,7 @@
import udt.packets.Destination;
import udt.packets.KeepAlive;
import udt.packets.Shutdown;
+import udt.util.SequenceNumber;
/**
* server side session in client-server mode
@@ -56,10 +56,10 @@
//last received packet (for testing purposes)
private UDTPacket lastPacket;
- public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
- super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort()));
+ public ServerSession(Destination peer, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
+ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+peer.getAddress()+":"+peer.getPort(),peer);
this.endPoint=endPoint;
- logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
+ logger.info("Created "+toString()+" talking to "+peer.getAddress()+":"+peer.getPort());
}
int n_handshake=0;
@@ -68,79 +68,45 @@
public void received(UDTPacket packet, Destination peer){
lastPacket=packet;
- if(packet instanceof ConnectionHandshake) {
- ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet;
- logger.info("Received "+connectionHandshake);
+ if(packet.isConnectionHandshake()) {
+ handleHandShake((ConnectionHandshake)packet);
+ return;
+ }
- if (getState()<=ready){
- destination.setSocketID(connectionHandshake.getSocketID());
-
- if(getState()<=handshaking){
- setState(handshaking);
- }
- try{
- handleHandShake(connectionHandshake);
- n_handshake++;
- try{
- setState(ready);
- socket=new UDTSocket(endPoint, this);
- cc.init();
- }catch(Exception uhe){
- //session is invalid
- logger.log(Level.SEVERE,"",uhe);
- setState(invalid);
- }
- }catch(IOException ex){
- //session invalid
- logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex);
- setState(invalid);
- }
- return;
- }
-
- }else if(packet instanceof KeepAlive) {
+ if(packet instanceof KeepAlive) {
socket.getReceiver().resetEXPTimer();
active = true;
return;
}
- if(getState()== ready) {
- active = true;
-
- if (packet instanceof KeepAlive) {
- //nothing to do here
- return;
- }else if (packet instanceof Shutdown) {
- try{
- socket.getReceiver().stop();
- }catch(IOException ex){
- logger.log(Level.WARNING,"",ex);
- }
- setState(shutdown);
- System.out.println("SHUTDOWN ***");
- active = false;
- logger.info("Connection shutdown initiated by the other side.");
- return;
+ if (packet instanceof Shutdown) {
+ try{
+ socket.getReceiver().stop();
+ }catch(IOException ex){
+ logger.log(Level.WARNING,"",ex);
}
+ setState(shutdown);
+ active = false;
+ logger.info("Connection shutdown initiated by peer.");
+ return;
+ }
- else{
- try{
- if(packet.forSender()){
- socket.getSender().receive(packet);
- }else{
- socket.getReceiver().receive(packet);
- }
- }catch(Exception ex){
- //session invalid
- logger.log(Level.SEVERE,"",ex);
- setState(invalid);
+ if(getState() == ready) {
+ active = true;
+ try{
+ if(packet.forSender()){
+ socket.getSender().receive(packet);
+ }else{
+ socket.getReceiver().receive(packet);
}
+ }catch(Exception ex){
+ //session invalid
+ logger.log(Level.SEVERE,"",ex);
+ setState(invalid);
}
return;
-
}
-
}
/**
@@ -151,16 +117,73 @@
}
/**
- * handle the connection handshake:<br/>
- * <ul>
- * <li>set initial sequence number</li>
- * <li>send response handshake</li>
- * </ul>
+ * reply to a connection handshake message
+ * @param connectionHandshake
+ */
+ protected void handleHandShake(ConnectionHandshake connectionHandshake){
+ logger.info("Received "+connectionHandshake + " in state <"+getState()+">");
+ if(getState()==ready){
+ //just send confirmation packet again
+ try{
+ sendFinalHandShake(connectionHandshake);
+ }catch(IOException io){}
+ return;
+ }
+
+ if (getState()<ready){
+ destination.setSocketID(connectionHandshake.getSocketID());
+
+ if(getState()<handshaking){
+ setState(handshaking);
+ }
+
+ try{
+ n_handshake++;
+ boolean handShakeComplete=handleSecondHandShake(connectionHandshake);
+ if(handShakeComplete){
+ logger.info("Client/Server handshake complete!");
+ setState(ready);
+ socket=new UDTSocket(endPoint, this);
+ cc.init();
+ }
+ }catch(IOException ex){
+ //session invalid
+ logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex);
+ setState(invalid);
+ }
+ }
+ }
+
+ private ConnectionHandshake finalConnectionHandshake;
+
+ /**
+ * handle the connection handshake
+ *
* @param handshake
* @param peer
* @throws IOException
*/
- protected void handleHandShake(ConnectionHandshake handshake)throws IOException{
+ protected boolean handleSecondHandShake(ConnectionHandshake handshake)throws IOException{
+ if(sessionCookie==0){
+ ackInitialHandshake(handshake);
+ //need one more handshake
+ return false;
+ }
+
+ long otherCookie=handshake.getCookie();
+ if(sessionCookie!=otherCookie){
+ setState(invalid);
+ throw new IOException("Invalid cookie <"+otherCookie+"> received, my cookie is <"+sessionCookie+">");
+ }
+ sendFinalHandShake(handshake);
+ return true;
+ }
+
+ /*
+ * response after the initial connection handshake received:
+ * compute cookie
+ */
+ protected void ackInitialHandshake(ConnectionHandshake handshake)throws IOException{
ConnectionHandshake responseHandshake = new ConnectionHandshake();
//compare the packet size and choose minimun
long clientBufferSize=handshake.getPacketSize();
@@ -178,12 +201,40 @@
responseHandshake.setSocketID(mySocketID);
responseHandshake.setDestinationID(this.getDestination().getSocketID());
responseHandshake.setSession(this);
+ sessionCookie=SequenceNumber.random();
+ responseHandshake.setCookie(sessionCookie);
+ responseHandshake.setAddress(endPoint.getLocalAddress());
logger.info("Sending reply "+responseHandshake);
endPoint.doSend(responseHandshake);
}
+ protected void sendFinalHandShake(ConnectionHandshake handshake)throws IOException{
+ if(finalConnectionHandshake==null){
+ finalConnectionHandshake= new ConnectionHandshake();
+ //compare the packet size and choose minimun
+ long clientBufferSize=handshake.getPacketSize();
+ long myBufferSize=getDatagramSize();
+ long bufferSize=Math.min(clientBufferSize, myBufferSize);
+ long initialSequenceNumber=handshake.getInitialSeqNo();
+ setInitialSequenceNumber(initialSequenceNumber);
+ setDatagramSize((int)bufferSize);
+ finalConnectionHandshake.setPacketSize(bufferSize);
+ finalConnectionHandshake.setUdtVersion(4);
+ finalConnectionHandshake.setInitialSeqNo(initialSequenceNumber);
+ finalConnectionHandshake.setConnectionType(-1);
+ finalConnectionHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize());
+ //tell peer what the socket ID on this side is
+ finalConnectionHandshake.setSocketID(mySocketID);
+ finalConnectionHandshake.setDestinationID(this.getDestination().getSocketID());
+ finalConnectionHandshake.setSession(this);
+ finalConnectionHandshake.setCookie(sessionCookie);
+ finalConnectionHandshake.setAddress(endPoint.getLocalAddress());
+ }
+ logger.info("Sending final handshake ack "+finalConnectionHandshake);
+ endPoint.doSend(finalConnectionHandshake);
+ }
}
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -40,6 +40,8 @@
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
@@ -70,10 +72,12 @@
//last received packet
private UDTPacket lastPacket;
+ private final Map<Destination,UDTSession> sessionsBeingConnected=Collections.synchronizedMap(new HashMap<Destination,UDTSession>());
+
//if the endpoint is configured for a server socket,
//this queue is used to handoff new UDTSessions to the application
private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
-
+
private boolean serverSocketMode=false;
//has the endpoint been stopped?
@@ -90,7 +94,7 @@
this.dgSocket=socket;
port=dgSocket.getLocalPort();
}
-
+
/**
* bind to any local port on the given host address
* @param localAddress
@@ -116,7 +120,7 @@
}
if(localPort>0)this.port = localPort;
else port=dgSocket.getLocalPort();
-
+
configureSocket();
}
@@ -127,7 +131,7 @@
dgSocket.setReceiveBufferSize(128*1024);
dgSocket.setReuseAddress(false);
}
-
+
/**
* bind to the default network interface on the machine
*
@@ -240,79 +244,75 @@
* </ul>
* @throws IOException
*/
- private long lastDestID=-1;
- private UDTSession lastSession;
-
- private int n=0;
-
- private final Object lock=new Object();
-
protected void doReceive()throws IOException{
while(!stopped){
try{
- try{
-
- //will block until a packet is received or timeout has expired
- dgSocket.receive(dp);
-
- Destination peer=new Destination(dp.getAddress(), dp.getPort());
- int l=dp.getLength();
- UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
- lastPacket=packet;
+ //will block until a packet is received or timeout has expired
+ dgSocket.receive(dp);
- //handle connection handshake
- if(packet.isConnectionHandshake()){
- synchronized(lock){
- Long id=Long.valueOf(packet.getDestinationID());
- UDTSession session=sessions.get(id);
- if(session==null){
- session=new ServerSession(dp,this);
- addSession(session.getSocketID(),session);
- //TODO need to check peer to avoid duplicate server session
- if(serverSocketMode){
- logger.fine("Pooling new request.");
- sessionHandoff.put(session);
- logger.fine("Request taken for processing.");
- }
- }
- peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
- session.received(packet,peer);
- }
- }
- else{
- //dispatch to existing session
- long dest=packet.getDestinationID();
- UDTSession session;
- if(dest==lastDestID){
- session=lastSession;
- }
- else{
- session=sessions.get(dest);
- lastSession=session;
- lastDestID=dest;
- }
- if(session==null){
- n++;
- if(n%100==1){
- logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
- }
- }
- else{
- session.received(packet,peer);
- }
- }
- }catch(SocketException ex){
- logger.log(Level.INFO, "SocketException: "+ex.getMessage());
- }catch(SocketTimeoutException ste){
- //can safely ignore... we will retry until the endpoint is stopped
+ Destination peer=new Destination(dp.getAddress(), dp.getPort());
+ int l=dp.getLength();
+ UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
+ lastPacket=packet;
+
+ long dest=packet.getDestinationID();
+ UDTSession session=sessions.get(dest);
+ if(session!=null){
+ //dispatch to existing session
+ session.received(packet,peer);
}
-
+ else if(packet.isConnectionHandshake()){
+ connectionHandshake((ConnectionHandshake)packet, peer);
+ }
+ else{
+ logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
+ }
+ }catch(SocketException ex){
+ logger.log(Level.INFO, "SocketException: "+ex.getMessage());
+ }catch(SocketTimeoutException ste){
+ //can safely ignore... we will retry until the endpoint is stopped
}catch(Exception ex){
logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
}
}
}
+ /**
+ * called when a "connection handshake" packet was received and no
+ * matching session yet exists
+ *
+ * @param packet
+ * @param peer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected synchronized void connectionHandshake(ConnectionHandshake packet, Destination peer)throws IOException, InterruptedException{
+ Destination p=new Destination(peer.getAddress(),peer.getPort());
+ UDTSession session=sessionsBeingConnected.get(peer);
+ long destID=packet.getDestinationID();
+ if(session!=null && session.getSocketID()==destID){
+ //confirmation handshake
+ sessionsBeingConnected.remove(p);
+ addSession(destID, session);
+ }
+ else if(session==null){
+ session=new ServerSession(peer,this);
+ sessionsBeingConnected.put(p,session);
+ sessions.put(session.getSocketID(), session);
+ if(serverSocketMode){
+ logger.fine("Pooling new request.");
+ sessionHandoff.put(session);
+ logger.fine("Request taken for processing.");
+ }
+ }
+ else {
+ throw new IOException("dest ID sent by client does not match");
+ }
+ Long peerSocketID=((ConnectionHandshake)packet).getSocketID();
+ peer.setSocketID(peerSocketID);
+ session.received(packet,peer);
+ }
+
protected void doSend(UDTPacket packet)throws IOException{
byte[]data=packet.getEncoded();
DatagramPacket dgp = packet.getSession().getDatagram();
@@ -327,4 +327,5 @@
public void sendRaw(DatagramPacket p)throws IOException{
dgSocket.send(p);
}
+
}
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -80,12 +80,11 @@
//create client session...
clientSession=new ClientSession(clientEndpoint,destination);
clientEndpoint.addSession(clientSession.getSocketID(), clientSession);
-
clientEndpoint.start();
clientSession.connect();
//wait for handshake
while(!clientSession.isReady()){
- Thread.sleep(5);
+ Thread.sleep(50);
}
logger.info("The UDTClient is connected");
}
Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -78,9 +78,19 @@
@Override
public int read()throws IOException{
int b=0;
- while(b==0)
+ while(b==0){
b=read(single);
-
+ if(b==0){
+ try{
+ while(receiveBuffer.isEmpty()){
+ Thread.sleep(20);
+ }
+ }catch(InterruptedException ie){
+ throw new IOException(ie);
+ }
+ }
+ }
+
if(b>0){
return single[0] & 0xFF;
}
@@ -153,9 +163,7 @@
else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS);
}catch(InterruptedException ie){
- IOException ex=new IOException();
- ex.initCause(ie);
- throw ex;
+ throw new IOException(ie);
}
return;
}
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -196,9 +196,14 @@
//starts the sender algorithm
private void start(){
+
Runnable r=new Runnable(){
public void run(){
try{
+ while(session.getSocket()==null)Thread.sleep(100);
+ session.getSocket().getInputStream();
+
+ logger.info("STARTING RECEIVER for "+session);
nextACK=Util.getCurrentTime()+ackTimerInterval;
nextNAK=(long)(Util.getCurrentTime()+1.5*nakTimerInterval);
nextEXP=Util.getCurrentTime()+2*expTimerInterval;
@@ -224,6 +229,9 @@
*/
protected void receive(UDTPacket p)throws IOException{
if(storeStatistics)dgReceiveInterval.end();
+ if(!p.isControlPacket()){
+ System.out.println("++ "+p+" queuesize="+handoffQueue.size());
+ }
handoffQueue.offer(p);
if(storeStatistics)dgReceiveInterval.begin();
}
@@ -265,6 +273,7 @@
needEXPReset=true;
}
}
+
if(needEXPReset){
nextEXP=Util.getCurrentTime()+expTimerInterval;
}
Modified: udt-java/trunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -151,7 +151,7 @@
* start the sender thread
*/
public void start(){
- logger.info("Starting sender for "+session);
+ logger.info("STARTING SENDER for "+session);
startLatch.countDown();
started=true;
}
Modified: udt-java/trunk/src/main/java/udt/UDTServerSocket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -31,6 +31,7 @@
*********************************************************************************/
package udt;
+
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
@@ -38,8 +39,8 @@
import java.util.logging.Logger;
-
public class UDTServerSocket {
+
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
private final UDPEndPoint endpoint;
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -39,6 +39,7 @@
import java.util.logging.Logger;
import udt.packets.Destination;
+import udt.util.SequenceNumber;
import udt.util.UDTStatistics;
public abstract class UDTSession {
@@ -53,9 +54,9 @@
//state constants
public static final int start=0;
public static final int handshaking=1;
- public static final int ready=2;
- public static final int keepalive=3;
- public static final int shutdown=4;
+ public static final int ready=50;
+ public static final int keepalive=80;
+ public static final int shutdown=90;
public static final int invalid=99;
@@ -70,6 +71,9 @@
//cache dgPacket (peer stays the same always)
private DatagramPacket dgPacket;
+ //session cookie created during handshake
+ protected long sessionCookie=0;
+
/**
* flow window size, i.e. how many data packets are
* in-flight at a single time
@@ -209,7 +213,7 @@
public synchronized long getInitialSequenceNumber(){
if(initialSequenceNumber==null){
- initialSequenceNumber=1l; //TODO must be random?
+ initialSequenceNumber=SequenceNumber.random();
}
return initialSequenceNumber;
}
Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -33,6 +33,8 @@
package udt.packets;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
import udt.UDTSession;
@@ -49,21 +51,29 @@
private long packetSize;
private long maxFlowWndSize;
- public static final long CONNECTION_TYPE_REGULAR=1;
+ public static final long CONNECTION_TYPE_REGULAR=1L;
- public static final long CONNECTION_TYPE_RENDEZVOUS=0;
+ public static final long CONNECTION_TYPE_RENDEZVOUS=0L;
+ /**
+ * connection type in response handshake packet
+ */
+ public static final long CONNECTION_SERVER_ACK=-1L;
+
private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode
private long socketID;
private long cookie=0;
+ //address of the UDP socket
+ private InetAddress address;
+
public ConnectionHandshake(){
this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal();
}
- public ConnectionHandshake(byte[]controlInformation){
+ public ConnectionHandshake(byte[]controlInformation)throws IOException{
this();
decode(controlInformation);
}
@@ -73,7 +83,7 @@
return true;
}
- void decode(byte[]data){
+ void decode(byte[]data)throws IOException{
udtVersion =PacketUtil.decode(data, 0);
socketType=PacketUtil.decode(data, 4);
initialSeqNo=PacketUtil.decode(data, 8);
@@ -81,9 +91,9 @@
maxFlowWndSize=PacketUtil.decode(data, 16);
connectionType=PacketUtil.decode(data, 20);
socketID=PacketUtil.decode(data, 24);
- if(data.length>28){
- cookie=PacketUtil.decode(data, 28);
- }
+ cookie=PacketUtil.decode(data, 28);
+ //TODO ipv6 check
+ address=PacketUtil.decodeInetAddress(data, 32, false);
}
public long getUdtVersion() {
@@ -134,11 +144,25 @@
public void setSocketID(long socketID) {
this.socketID = socketID;
}
+ public long getCookie() {
+ return cookie;
+ }
+ public void setCookie(long cookie) {
+ this.cookie = cookie;
+ }
+ public InetAddress getAddress() {
+ return address;
+ }
+
+ public void setAddress(InetAddress address) {
+ this.address = address;
+ }
+
@Override
public byte[] encodeControlInformation(){
try {
- ByteArrayOutputStream bos=new ByteArrayOutputStream(24);
+ ByteArrayOutputStream bos=new ByteArrayOutputStream(48);
bos.write(PacketUtil.encode(udtVersion));
bos.write(PacketUtil.encode(socketType));
bos.write(PacketUtil.encode(initialSeqNo));
@@ -146,6 +170,8 @@
bos.write(PacketUtil.encode(maxFlowWndSize));
bos.write(PacketUtil.encode(connectionType));
bos.write(PacketUtil.encode(socketID));
+ bos.write(PacketUtil.encode(cookie));
+ bos.write(PacketUtil.encode(address));
return bos.toByteArray();
} catch (Exception e) {
// can't happen
@@ -178,6 +204,10 @@
return false;
if (udtVersion != other.udtVersion)
return false;
+ if (cookie!=other.cookie)
+ return false;
+ if (!address.equals(other.address))
+ return false;
return true;
}
@@ -198,6 +228,7 @@
sb.append(", socketType=").append(socketType);
sb.append(", destSocketID=").append(destinationID);
if(cookie>0)sb.append(", cookie=").append(cookie);
+ sb.append(", address=").append(address);
sb.append("]");
return sb.toString();
}
Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -31,6 +31,8 @@
*********************************************************************************/
package udt.packets;
+import java.io.IOException;
+
import udt.UDTPacket;
import udt.packets.ControlPacket.*;
@@ -42,13 +44,13 @@
* @param packetData
* @return
*/
- public static UDTPacket createPacket(byte[]encodedData){
+ public static UDTPacket createPacket(byte[]encodedData)throws IOException{
boolean isControl=(encodedData[0]&128) !=0 ;
if(isControl)return createControlPacket(encodedData,encodedData.length);
return new DataPacket(encodedData);
}
- public static UDTPacket createPacket(byte[]encodedData,int length){
+ public static UDTPacket createPacket(byte[]encodedData,int length)throws IOException{
boolean isControl=(encodedData[0]&128) !=0 ;
if(isControl)return createControlPacket(encodedData,length);
return new DataPacket(encodedData,length);
@@ -59,7 +61,7 @@
* @param packetData
* @return
*/
- public static ControlPacket createControlPacket(byte[]encodedData,int length){
+ public static ControlPacket createControlPacket(byte[]encodedData,int length)throws IOException{
ControlPacket packet=null;
Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -32,7 +32,10 @@
package udt.packets;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
public class PacketUtil {
public static byte[]encode(long value){
@@ -80,4 +83,30 @@
return result;
}
+ /**
+ * encodes the specified address into 128 bit
+ * @param address - inet address
+ */
+ public static byte[] encode(InetAddress address){
+ byte[]res=new byte[16];
+ byte[]add=address.getAddress();
+ System.arraycopy(add, 0, res, 0, add.length);
+ return res;
+ }
+
+ public static InetAddress decodeInetAddress(byte[]data, int start, boolean ipV6)throws UnknownHostException{
+ InetAddress result=null;
+ byte[] add=ipV6?new byte[16]:new byte[4];
+ System.arraycopy(data, start, add, 0, add.length);
+ result=InetAddress.getByAddress(add);
+ return result;
+ }
+
+ public static void print(byte[]arr){
+ System.out.print("[");
+ for(byte b: arr){
+ System.out.print(" "+(b&0xFF));
+ }
+ System.out.println(" ]");
+ }
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -54,7 +54,9 @@
try{
long seq=data.getSequenceNumber();
//if already have this chunk, discard it
- if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true;
+ if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0){
+ return true;
+ }
//else compute insert position
int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq);
int insert=offset% size;
Modified: udt-java/trunk/src/main/java/udt/util/SequenceNumber.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -13,7 +13,7 @@
private final static long maxSequenceNo=0x7FFFFFFF;
-
+ private final static Random rand=new Random();
/**
* compare seq1 and seq2. Returns zero, if they are equal, a negative value if seq1 is smaller than
* seq2, and a positive value if seq1 is larger than seq2.
@@ -67,7 +67,7 @@
* generates a random number between 1 and 0x3FFFFFFF (inclusive)
*/
public static long random(){
- return 1+new Random().nextInt(maxOffset);
+ return 1+rand.nextInt(maxOffset);
}
}
Modified: udt-java/trunk/src/test/java/echo/EchoServer.java
===================================================================
--- udt-java/trunk/src/test/java/echo/EchoServer.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/test/java/echo/EchoServer.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -78,7 +78,7 @@
String line=readLine(in);
if(line!=null){
System.out.println("ECHO: "+line);
- //else echo back the line
+ //echo back the line
writer.println(line);
writer.flush();
}
Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -101,7 +101,6 @@
try{
for(int i=0;i<blocks.length;i++){
while(!is.haveNewData(i+1, blocks[i])){
- Thread.yield();
Thread.sleep(100);
}
}
Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java
===================================================================
--- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-03-28 06:34:15 UTC (rev 73)
+++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-05-25 08:03:03 UTC (rev 74)
@@ -3,17 +3,20 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import udt.UDTPacket;
+import udt.util.SequenceNumber;
public class TestPacketFactory {
@Test
- public void testData(){
+ public void testData()throws IOException{
String test="sdjfsdjfldskjflds";
byte[]data=test.getBytes();
@@ -26,7 +29,7 @@
}
@Test
- public void testConnectionHandshake(){
+ public void testConnectionHandshake()throws IOException{
ConnectionHandshake p1 = new ConnectionHandshake();
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
@@ -39,8 +42,9 @@
p1.setMaxFlowWndSize(128);
p1.setSocketID(1);
p1.setUdtVersion(4);
-
-
+ p1.setAddress(InetAddress.getLocalHost());
+ p1.setCookie(SequenceNumber.random());
+
byte[]p1_data=p1.getEncoded();
UDTPacket p=PacketFactory.createPacket(p1_data);
@@ -50,7 +54,7 @@
}
@Test
- public void testAcknowledgement(){
+ public void testAcknowledgement()throws IOException{
Acknowledgement p1 = new Acknowledgement();
p1.setAckSequenceNumber(1234);
p1.setMessageNumber(9876);
@@ -70,7 +74,7 @@
}
@Test
- public void testAcknowledgementOfAcknowledgement(){
+ public void testAcknowledgementOfAcknowledgement()throws IOException{
Acknowledgment2 p1 = new Acknowledgment2();
p1.setAckSequenceNumber(1230);
p1.setMessageNumber(9871);
@@ -86,7 +90,7 @@
}
@Test
- public void testNegativeAcknowledgement(){
+ public void testNegativeAcknowledgement()throws IOException{
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
@@ -105,7 +109,7 @@
}
@Test
- public void testNegativeAcknowledgement2(){
+ public void testNegativeAcknowledgement2()throws IOException{
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
@@ -130,7 +134,7 @@
}
@Test
- public void testNegativeAcknowledgement3(){
+ public void testNegativeAcknowledgement3()throws IOException{
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
@@ -148,13 +152,12 @@
}
@Test
- public void testShutdown(){
+ public void testShutdown()throws IOException{
Shutdown p1 = new Shutdown();
p1.setMessageNumber(9874);
p1.setTimeStamp(3453);
p1.setDestinationID(3);
-
byte[]p1_data=p1.getEncoded();
UDTPacket p=PacketFactory.createPacket(p1_data);
@@ -164,7 +167,7 @@
@Test
- public void testMessageDropRequest(){
+ public void testMessageDropRequest()throws Exception{
MessageDropRequest p1=new MessageDropRequest();
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
@@ -181,5 +184,15 @@
MessageDropRequest p2=(MessageDropRequest)p;
assertEquals(p1,p2);
}
+
+ @Test
+ public void testPacketUtil()throws Exception{
+ InetAddress i=InetAddress.getLocalHost();
+ byte[]enc=PacketUtil.encode(i);
+ PacketUtil.print(enc);
+ InetAddress i2=PacketUtil.decodeInetAddress(enc, 0, false);
+ System.out.println(i2);
+ assertEquals(i, i2);
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|