[Udt-java-commits] SF.net SVN: udt-java:[45] udt-java/trunk/src/main/java/udt
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-09-01 12:51:36
|
Revision: 45
http://udt-java.svn.sourceforge.net/udt-java/?rev=45&view=rev
Author: bschuller
Date: 2010-09-01 12:51:29 +0000 (Wed, 01 Sep 2010)
Log Message:
-----------
send/recv file works with c++ version
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/ClientSession.java
udt-java/trunk/src/main/java/udt/ServerSession.java
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTClient.java
udt-java/trunk/src/main/java/udt/UDTSession.java
udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java
udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
udt-java/trunk/src/main/java/udt/packets/Destination.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
Modified: udt-java/trunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -43,18 +43,15 @@
import udt.util.SequenceNumber;
/**
- * Keep state of a UDT connection. Once established, the
- * session provides a valid {@link UDTSocket}.
- * This can be used as client session in both client-server mode and rendezvous mode.
- *
- *
+ * Client side of a client-server UDT connection.
+ * Once established, the session provides a valid {@link UDTSocket}.
*/
public class ClientSession extends UDTSession {
private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
private UDPEndPoint endPoint;
-
+
public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
@@ -67,11 +64,11 @@
* @throws InterruptedException
* @throws IOException
*/
-
+
public void connect() throws InterruptedException,IOException{
int n=0;
- sendHandShake();
while(getState()!=ready){
+ sendHandShake();
if(getState()==invalid)throw new IOException("Can't connect!");
n++;
if(getState()!=ready)Thread.sleep(500);
@@ -79,28 +76,48 @@
cc.init();
logger.info("Connected, "+n+" handshake packets sent");
}
-
+
@Override
public void received(UDTPacket packet, Destination peer) {
-
+
lastPacket=packet;
-
- if (getState()!=ready && packet instanceof ConnectionHandshake) {
- try{
- logger.info("Received connection handshake from "+peer);
- //TODO validate parameters sent by peer
- setState(ready);
- long peerSocketID=((ConnectionHandshake)packet).getSocketID();
- destination.setSocketID(peerSocketID);
- socket=new UDTSocket(endPoint,this);
- }catch(Exception ex){
- logger.log(Level.WARNING,"Error creating socket",ex);
- setState(invalid);
+
+ if (packet instanceof ConnectionHandshake) {
+ ConnectionHandshake hs=(ConnectionHandshake)packet;
+
+ logger.info("Received connection handshake from "+peer+"\n"+hs);
+
+ if (getState()!=ready) {
+ if(hs.getConnectionType()==1){
+ try{
+ //TODO validate parameters sent by peer
+ long peerSocketID=hs.getSocketID();
+ destination.setSocketID(peerSocketID);
+ sendConfirmation(hs);
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"Error creating socket",ex);
+ setState(invalid);
+ }
+ return;
+ }
+ else{
+ try{
+ //TODO validate parameters sent by peer
+ long peerSocketID=hs.getSocketID();
+ destination.setSocketID(peerSocketID);
+ setState(ready);
+ socket=new UDTSocket(endPoint,this);
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"Error creating socket",ex);
+ setState(invalid);
+ }
+ return;
+ }
}
- return;
}
+
if(getState() == ready) {
-
+
if(packet instanceof Shutdown){
setState(shutdown);
active=false;
@@ -120,27 +137,41 @@
setState(invalid);
}
return;
- }
+ }
}
//handshake for connect
protected void sendHandShake()throws IOException{
ConnectionHandshake handshake = new ConnectionHandshake();
- handshake.setConnectionType(1);
- handshake.setSocketType(1);
+ handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
+ handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
long initialSequenceNo=SequenceNumber.random();
setInitialSequenceNumber(initialSequenceNo);
handshake.setInitialSeqNo(initialSequenceNo);
handshake.setPacketSize(getDatagramSize());
handshake.setSocketID(mySocketID);
+ handshake.setMaxFlowWndSize(flowWindowSize);
handshake.setSession(this);
logger.info("Sending "+handshake);
endPoint.doSend(handshake);
}
-
-
+ //2nd handshake for connect
+ protected void sendConfirmation(ConnectionHandshake hs)throws IOException{
+ ConnectionHandshake handshake = new ConnectionHandshake();
+ handshake.setConnectionType(-1);
+ handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
+ handshake.setInitialSeqNo(hs.getInitialSeqNo());
+ handshake.setPacketSize(hs.getPacketSize());
+ handshake.setSocketID(mySocketID);
+ handshake.setMaxFlowWndSize(flowWindowSize);
+ handshake.setSession(this);
+ logger.info("Sending confirmation "+handshake);
+ endPoint.doSend(handshake);
+ }
+
+
public UDTPacket getLastPkt(){
return lastPacket;
}
Modified: udt-java/trunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ServerSession.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/ServerSession.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -67,33 +67,37 @@
@Override
public void received(UDTPacket packet, Destination peer){
lastPacket=packet;
- if (getState()<=ready && packet instanceof ConnectionHandshake) {
+
+ if(packet instanceof ConnectionHandshake) {
ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet;
- destination.setSocketID(connectionHandshake.getSocketID());
-
logger.info("Received "+connectionHandshake);
-
- if(getState()<=handshaking){
- setState(handshaking);
- }
- try{
- handleHandShake(connectionHandshake);
- n_handshake++;
+
+ if (getState()<=ready){
+ destination.setSocketID(connectionHandshake.getSocketID());
+
+ if(getState()<=handshaking){
+ setState(handshaking);
+ }
try{
- setState(ready);
- socket=new UDTSocket(endPoint, this);
- cc.init();
- }catch(Exception uhe){
- //session is invalid
- logger.log(Level.SEVERE,"",uhe);
+ handleHandShake(connectionHandshake);
+ n_handshake++;
+ try{
+ setState(ready);
+ socket=new UDTSocket(endPoint, this);
+ cc.init();
+ }catch(Exception uhe){
+ //session is invalid
+ logger.log(Level.SEVERE,"",uhe);
+ setState(invalid);
+ }
+ }catch(IOException ex){
+ //session invalid
+ logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex);
setState(invalid);
}
- }catch(IOException ex){
- //session invalid
- logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex);
- setState(invalid);
+ return;
}
- return;
+
}else if(packet instanceof KeepAlive) {
socket.getReceiver().resetEXPTimer();
active = true;
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -67,9 +67,6 @@
//active sessions keyed by socket ID
private final Map<Long,UDTSession>sessions=new ConcurrentHashMap<Long, UDTSession>();
- //connecting sessions keyed by peer destination
- private final Map<Destination,UDTSession>clientSessions=new ConcurrentHashMap<Destination, UDTSession>();;
-
//last received packet
private UDTPacket lastPacket;
@@ -82,7 +79,7 @@
//has the endpoint been stopped?
private volatile boolean stopped=false;
- public static final int DATAGRAM_SIZE=1200;
+ public static final int DATAGRAM_SIZE=1400;
/**
* create an endpoint on the given socket
@@ -206,14 +203,6 @@
sessions.put(destinationID, session);
}
- public void addClientSession(Destination peer,UDTSession session){
- clientSessions.put(peer, session);
- }
-
- public void removeClientSession(Destination peer){
- clientSessions.remove(peer);
- }
-
public UDTSession getSession(Long destinationID){
return sessions.get(destinationID);
}
@@ -250,6 +239,8 @@
//MeanValue v=new MeanValue("receiver processing ",true, 256);
+ private int n=0;
+
private final Object lock=new Object();
protected void doReceive()throws IOException{
@@ -271,7 +262,8 @@
//handle connection handshake
if(packet.isConnectionHandshake()){
synchronized(lock){
- UDTSession session=clientSessions.get(peer);
+ Long id=Long.valueOf(packet.getDestinationID());
+ UDTSession session=sessions.get(id);
if(session==null){
session=new ServerSession(dp,this);
addSession(session.getSocketID(),session);
@@ -299,7 +291,10 @@
lastDestID=dest;
}
if(session==null){
- logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
+ n++;
+ if(n%100==1){
+ logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
+ }
}
else{
session.received(packet,peer);
Modified: udt-java/trunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTClient.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/UDTClient.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -78,7 +78,6 @@
Destination destination=new Destination(address,port);
//create client session...
clientSession=new ClientSession(clientEndpoint,destination);
- clientEndpoint.addClientSession(destination, clientSession);
clientEndpoint.addSession(clientSession.getSocketID(), clientSession);
clientEndpoint.start();
Modified: udt-java/trunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -33,6 +33,7 @@
package udt;
import java.net.DatagramPacket;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -73,7 +74,7 @@
* flow window size, i.e. how many data packets are
* in-flight at a single time
*/
- protected int flowWindowSize=4*128;
+ protected int flowWindowSize=8192;//4*128;
/**
* remote UDT entity (address and socket ID)
@@ -104,7 +105,7 @@
protected final long mySocketID;
- private final static AtomicLong nextSocketID=new AtomicLong(0);
+ private final static AtomicLong nextSocketID=new AtomicLong(20+new Random().nextInt(5000));
public UDTSession(String description, Destination destination){
statistics=new UDTStatistics(description);
@@ -220,4 +221,14 @@
public DatagramPacket getDatagram(){
return dgPacket;
}
+
+ public String toString(){
+ StringBuilder sb=new StringBuilder();
+ sb.append(super.toString());
+ sb.append(" [");
+ sb.append("socketID=").append(this.mySocketID);
+ sb.append(" ]");
+ return sb.toString();
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -72,9 +72,9 @@
void decodeControlInformation(byte[] data){
ackNumber=PacketUtil.decode(data, 0);
if(data.length>4){
- roundTripTime =PacketUtil.decode(data, 4);
- roundTripTimeVariance = PacketUtil.decode(data, 8);
- bufferSize = PacketUtil.decode(data, 12);
+ roundTripTime =PacketUtil.decode(data, 4);
+ roundTripTimeVariance = PacketUtil.decode(data, 8);
+ bufferSize = PacketUtil.decode(data, 12);
}
if(data.length>16){
pktArrivalSpeed = PacketUtil.decode(data, 16);
Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -40,19 +40,25 @@
private long udtVersion=4;
public static final long SOCKET_TYPE_STREAM=0;
+
public static final long SOCKET_TYPE_DGRAM=1;
- private long socketType= SOCKET_TYPE_STREAM;//STREAM OR DGRAM
+ private long socketType= SOCKET_TYPE_DGRAM; //stream or dgram
+
private long initialSeqNo = 0;
private long packetSize;
private long maxFlowWndSize;
- public static final long CONNECTION_TYPE_REGULAR=0;
- public static final long CONNECTION_TYPE_RENDEZVOUS=1;
- private long connectionType = 0;//regular or rendezvous mode
+ public static final long CONNECTION_TYPE_REGULAR=1;
+ public static final long CONNECTION_TYPE_RENDEZVOUS=0;
+
+ private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode
+
private long socketID;
+ private long cookie=0;
+
public ConnectionHandshake(){
this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal();
}
@@ -75,6 +81,9 @@
maxFlowWndSize=PacketUtil.decode(data, 16);
connectionType=PacketUtil.decode(data, 20);
socketID=PacketUtil.decode(data, 24);
+ if(data.length>28){
+ cookie=PacketUtil.decode(data, 28);
+ }
}
public long getUdtVersion() {
@@ -176,16 +185,19 @@
public String toString(){
StringBuilder sb=new StringBuilder();
sb.append("ConnectionHandshake [");
+ sb.append("connectionType=").append(connectionType);
UDTSession session=getSession();
if(session!=null){
+ sb.append(", ");
sb.append(session.getDestination());
- sb.append(", ");
}
- sb.append("mySocketID=").append(socketID);
+ sb.append(", mySocketID=").append(socketID);
sb.append(", initialSeqNo=").append(initialSeqNo);
sb.append(", packetSize=").append(packetSize);
sb.append(", maxFlowWndSize=").append(maxFlowWndSize);
+ sb.append(", socketType=").append(socketType);
sb.append(", destSocketID=").append(destinationID);
+ if(cookie>0)sb.append(", cookie=").append(cookie);
sb.append("]");
return sb.toString();
}
Modified: udt-java/trunk/src/main/java/udt/packets/Destination.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -65,8 +65,8 @@
}
public String toString(){
- return("Destination: "+address.getHostName()+" port="+port+" socketID="+socketID);
- }
+ return("Destination ["+address.getHostName()+" port="+port+" socketID="+socketID)+"]";
+ }
@Override
public int hashCode() {
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-08-31 09:34:20 UTC (rev 44)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-01 12:51:29 UTC (rev 45)
@@ -88,11 +88,10 @@
out.write(nameinfo);
out.flush();
-
//pause the sender to save some CPU time
out.pauseOutput();
- //read size info (an 4-byte int)
+ //read size info (an 64 bit number)
byte[]sizeInfo=new byte[8];
int total=0;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|