[Udt-java-commits] SF.net SVN: udt-java:[45] udt-java/trunk/src/main/java/udt
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-09-01 12:51:36
|
Revision: 45 http://udt-java.svn.sourceforge.net/udt-java/?rev=45&view=rev Author: bschuller Date: 2010-09-01 12:51:29 +0000 (Wed, 01 Sep 2010) Log Message: ----------- send/recv file works with c++ version Modified Paths: -------------- udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/ServerSession.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/trunk/src/main/java/udt/packets/Destination.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-09-01 12:51:29 UTC (rev 45) @@ -43,18 +43,15 @@ import udt.util.SequenceNumber; /** - * Keep state of a UDT connection. Once established, the - * session provides a valid {@link UDTSocket}. - * This can be used as client session in both client-server mode and rendezvous mode. - * - * + * Client side of a client-server UDT connection. + * Once established, the session provides a valid {@link UDTSocket}. */ public class ClientSession extends UDTSession { private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); private UDPEndPoint endPoint; - + public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; @@ -67,11 +64,11 @@ * @throws InterruptedException * @throws IOException */ - + public void connect() throws InterruptedException,IOException{ int n=0; - sendHandShake(); while(getState()!=ready){ + sendHandShake(); if(getState()==invalid)throw new IOException("Can't connect!"); n++; if(getState()!=ready)Thread.sleep(500); @@ -79,28 +76,48 @@ cc.init(); logger.info("Connected, "+n+" handshake packets sent"); } - + @Override public void received(UDTPacket packet, Destination peer) { - + lastPacket=packet; - - if (getState()!=ready && packet instanceof ConnectionHandshake) { - try{ - logger.info("Received connection handshake from "+peer); - //TODO validate parameters sent by peer - setState(ready); - long peerSocketID=((ConnectionHandshake)packet).getSocketID(); - destination.setSocketID(peerSocketID); - socket=new UDTSocket(endPoint,this); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); + + if (packet instanceof ConnectionHandshake) { + ConnectionHandshake hs=(ConnectionHandshake)packet; + + logger.info("Received connection handshake from "+peer+"\n"+hs); + + if (getState()!=ready) { + if(hs.getConnectionType()==1){ + try{ + //TODO validate parameters sent by peer + long peerSocketID=hs.getSocketID(); + destination.setSocketID(peerSocketID); + sendConfirmation(hs); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + return; + } + else{ + try{ + //TODO validate parameters sent by peer + long peerSocketID=hs.getSocketID(); + destination.setSocketID(peerSocketID); + setState(ready); + socket=new UDTSocket(endPoint,this); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + return; + } } - return; } + if(getState() == ready) { - + if(packet instanceof Shutdown){ setState(shutdown); active=false; @@ -120,27 +137,41 @@ setState(invalid); } return; - } + } } //handshake for connect protected void sendHandShake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); - handshake.setConnectionType(1); - handshake.setSocketType(1); + handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); + handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); long initialSequenceNo=SequenceNumber.random(); setInitialSequenceNumber(initialSequenceNo); handshake.setInitialSeqNo(initialSequenceNo); handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); + handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); logger.info("Sending "+handshake); endPoint.doSend(handshake); } - - + //2nd handshake for connect + protected void sendConfirmation(ConnectionHandshake hs)throws IOException{ + ConnectionHandshake handshake = new ConnectionHandshake(); + handshake.setConnectionType(-1); + handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); + handshake.setInitialSeqNo(hs.getInitialSeqNo()); + handshake.setPacketSize(hs.getPacketSize()); + handshake.setSocketID(mySocketID); + handshake.setMaxFlowWndSize(flowWindowSize); + handshake.setSession(this); + logger.info("Sending confirmation "+handshake); + endPoint.doSend(handshake); + } + + public UDTPacket getLastPkt(){ return lastPacket; } Modified: udt-java/trunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ServerSession.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/ServerSession.java 2010-09-01 12:51:29 UTC (rev 45) @@ -67,33 +67,37 @@ @Override public void received(UDTPacket packet, Destination peer){ lastPacket=packet; - if (getState()<=ready && packet instanceof ConnectionHandshake) { + + if(packet instanceof ConnectionHandshake) { ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet; - destination.setSocketID(connectionHandshake.getSocketID()); - logger.info("Received "+connectionHandshake); - - if(getState()<=handshaking){ - setState(handshaking); - } - try{ - handleHandShake(connectionHandshake); - n_handshake++; + + if (getState()<=ready){ + destination.setSocketID(connectionHandshake.getSocketID()); + + if(getState()<=handshaking){ + setState(handshaking); + } try{ - setState(ready); - socket=new UDTSocket(endPoint, this); - cc.init(); - }catch(Exception uhe){ - //session is invalid - logger.log(Level.SEVERE,"",uhe); + handleHandShake(connectionHandshake); + n_handshake++; + try{ + setState(ready); + socket=new UDTSocket(endPoint, this); + cc.init(); + }catch(Exception uhe){ + //session is invalid + logger.log(Level.SEVERE,"",uhe); + setState(invalid); + } + }catch(IOException ex){ + //session invalid + logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); setState(invalid); } - }catch(IOException ex){ - //session invalid - logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); - setState(invalid); + return; } - return; + }else if(packet instanceof KeepAlive) { socket.getReceiver().resetEXPTimer(); active = true; Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-09-01 12:51:29 UTC (rev 45) @@ -67,9 +67,6 @@ //active sessions keyed by socket ID private final Map<Long,UDTSession>sessions=new ConcurrentHashMap<Long, UDTSession>(); - //connecting sessions keyed by peer destination - private final Map<Destination,UDTSession>clientSessions=new ConcurrentHashMap<Destination, UDTSession>();; - //last received packet private UDTPacket lastPacket; @@ -82,7 +79,7 @@ //has the endpoint been stopped? private volatile boolean stopped=false; - public static final int DATAGRAM_SIZE=1200; + public static final int DATAGRAM_SIZE=1400; /** * create an endpoint on the given socket @@ -206,14 +203,6 @@ sessions.put(destinationID, session); } - public void addClientSession(Destination peer,UDTSession session){ - clientSessions.put(peer, session); - } - - public void removeClientSession(Destination peer){ - clientSessions.remove(peer); - } - public UDTSession getSession(Long destinationID){ return sessions.get(destinationID); } @@ -250,6 +239,8 @@ //MeanValue v=new MeanValue("receiver processing ",true, 256); + private int n=0; + private final Object lock=new Object(); protected void doReceive()throws IOException{ @@ -271,7 +262,8 @@ //handle connection handshake if(packet.isConnectionHandshake()){ synchronized(lock){ - UDTSession session=clientSessions.get(peer); + Long id=Long.valueOf(packet.getDestinationID()); + UDTSession session=sessions.get(id); if(session==null){ session=new ServerSession(dp,this); addSession(session.getSocketID(),session); @@ -299,7 +291,10 @@ lastDestID=dest; } if(session==null){ - logger.warning("Unknown session <"+packet.getDestinationID()+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + n++; + if(n%100==1){ + logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + } } else{ session.received(packet,peer); Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2010-09-01 12:51:29 UTC (rev 45) @@ -78,7 +78,6 @@ Destination destination=new Destination(address,port); //create client session... clientSession=new ClientSession(clientEndpoint,destination); - clientEndpoint.addClientSession(destination, clientSession); clientEndpoint.addSession(clientSession.getSocketID(), clientSession); clientEndpoint.start(); Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2010-09-01 12:51:29 UTC (rev 45) @@ -33,6 +33,7 @@ package udt; import java.net.DatagramPacket; +import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -73,7 +74,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=4*128; + protected int flowWindowSize=8192;//4*128; /** * remote UDT entity (address and socket ID) @@ -104,7 +105,7 @@ protected final long mySocketID; - private final static AtomicLong nextSocketID=new AtomicLong(0); + private final static AtomicLong nextSocketID=new AtomicLong(20+new Random().nextInt(5000)); public UDTSession(String description, Destination destination){ statistics=new UDTStatistics(description); @@ -220,4 +221,14 @@ public DatagramPacket getDatagram(){ return dgPacket; } + + public String toString(){ + StringBuilder sb=new StringBuilder(); + sb.append(super.toString()); + sb.append(" ["); + sb.append("socketID=").append(this.mySocketID); + sb.append(" ]"); + return sb.toString(); + } + } Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-09-01 12:51:29 UTC (rev 45) @@ -72,9 +72,9 @@ void decodeControlInformation(byte[] data){ ackNumber=PacketUtil.decode(data, 0); if(data.length>4){ - roundTripTime =PacketUtil.decode(data, 4); - roundTripTimeVariance = PacketUtil.decode(data, 8); - bufferSize = PacketUtil.decode(data, 12); + roundTripTime =PacketUtil.decode(data, 4); + roundTripTimeVariance = PacketUtil.decode(data, 8); + bufferSize = PacketUtil.decode(data, 12); } if(data.length>16){ pktArrivalSpeed = PacketUtil.decode(data, 16); Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-09-01 12:51:29 UTC (rev 45) @@ -40,19 +40,25 @@ private long udtVersion=4; public static final long SOCKET_TYPE_STREAM=0; + public static final long SOCKET_TYPE_DGRAM=1; - private long socketType= SOCKET_TYPE_STREAM;//STREAM OR DGRAM + private long socketType= SOCKET_TYPE_DGRAM; //stream or dgram + private long initialSeqNo = 0; private long packetSize; private long maxFlowWndSize; - public static final long CONNECTION_TYPE_REGULAR=0; - public static final long CONNECTION_TYPE_RENDEZVOUS=1; - private long connectionType = 0;//regular or rendezvous mode + public static final long CONNECTION_TYPE_REGULAR=1; + public static final long CONNECTION_TYPE_RENDEZVOUS=0; + + private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode + private long socketID; + private long cookie=0; + public ConnectionHandshake(){ this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal(); } @@ -75,6 +81,9 @@ maxFlowWndSize=PacketUtil.decode(data, 16); connectionType=PacketUtil.decode(data, 20); socketID=PacketUtil.decode(data, 24); + if(data.length>28){ + cookie=PacketUtil.decode(data, 28); + } } public long getUdtVersion() { @@ -176,16 +185,19 @@ public String toString(){ StringBuilder sb=new StringBuilder(); sb.append("ConnectionHandshake ["); + sb.append("connectionType=").append(connectionType); UDTSession session=getSession(); if(session!=null){ + sb.append(", "); sb.append(session.getDestination()); - sb.append(", "); } - sb.append("mySocketID=").append(socketID); + sb.append(", mySocketID=").append(socketID); sb.append(", initialSeqNo=").append(initialSeqNo); sb.append(", packetSize=").append(packetSize); sb.append(", maxFlowWndSize=").append(maxFlowWndSize); + sb.append(", socketType=").append(socketType); sb.append(", destSocketID=").append(destinationID); + if(cookie>0)sb.append(", cookie=").append(cookie); sb.append("]"); return sb.toString(); } Modified: udt-java/trunk/src/main/java/udt/packets/Destination.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/packets/Destination.java 2010-09-01 12:51:29 UTC (rev 45) @@ -65,8 +65,8 @@ } public String toString(){ - return("Destination: "+address.getHostName()+" port="+port+" socketID="+socketID); - } + return("Destination ["+address.getHostName()+" port="+port+" socketID="+socketID)+"]"; + } @Override public int hashCode() { Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-08-31 09:34:20 UTC (rev 44) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-01 12:51:29 UTC (rev 45) @@ -88,11 +88,10 @@ out.write(nameinfo); out.flush(); - //pause the sender to save some CPU time out.pauseOutput(); - //read size info (an 4-byte int) + //read size info (an 64 bit number) byte[]sizeInfo=new byte[8]; int total=0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |