[Udt-java-commits] SF.net SVN: udt-java:[74] udt-java/trunk
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2012-05-25 08:03:15
|
Revision: 74 http://udt-java.svn.sourceforge.net/udt-java/?rev=74&view=rev Author: bschuller Date: 2012-05-25 08:03:03 +0000 (Fri, 25 May 2012) Log Message: ----------- attempt to switch to new UDT-C++ style of connection handshake. DOES NOT WORK yet Modified Paths: -------------- udt-java/trunk/pom.xml udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/ServerSession.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTServerSocket.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/PacketUtil.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/SequenceNumber.java udt-java/trunk/src/test/java/echo/EchoServer.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/pom.xml 2012-05-25 08:03:03 UTC (rev 74) @@ -5,7 +5,7 @@ <artifactId>udt-java</artifactId> <packaging>jar</packaging> <name>UDT Java implementation</name> - <version>0.6-SNAPSHOT</version> + <version>0.7-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> <developers> <developer> Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -52,6 +52,8 @@ private UDPEndPoint endPoint; + long initialSequenceNo=SequenceNumber.random(); + public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; @@ -60,7 +62,7 @@ /** * send connection handshake until a reply from server is received - * TODO check for timeout + * @throws InterruptedException * @throws IOException */ @@ -68,11 +70,21 @@ public void connect() throws InterruptedException,IOException{ int n=0; while(getState()!=ready){ - sendHandShake(); if(getState()==invalid)throw new IOException("Can't connect!"); - n++; - if(getState()!=ready)Thread.sleep(500); + if(getState()<=handshaking){ + setState(handshaking); + sendInitialHandShake(); + } + else if(getState()==handshaking+1){ + sendSecondHandshake(); + } + + if(getState()==invalid)throw new IOException("Can't connect!"); + if(n++ > 10)throw new IOException("Could not connect to server within the timeout."); + + Thread.sleep(500); } + Thread.sleep(1000); cc.init(); logger.info("Connected, "+n+" handshake packets sent"); } @@ -82,38 +94,10 @@ lastPacket=packet; - if (packet instanceof ConnectionHandshake) { + if (packet.isConnectionHandshake()) { ConnectionHandshake hs=(ConnectionHandshake)packet; - - logger.info("Received connection handshake from "+peer+"\n"+hs); - - if (getState()!=ready) { - if(hs.getConnectionType()==1){ - try{ - //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); - destination.setSocketID(peerSocketID); - sendConfirmation(hs); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); - } - return; - } - else{ - try{ - //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); - destination.setSocketID(peerSocketID); - setState(ready); - socket=new UDTSocket(endPoint,this); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); - } - return; - } - } + handleConnectionHandshake(hs,peer); + return; } if(getState() == ready) { @@ -140,9 +124,43 @@ } } + protected void handleConnectionHandshake(ConnectionHandshake hs, Destination peer){ - //handshake for connect - protected void sendHandShake()throws IOException{ + if (getState()==handshaking) { + logger.info("Received initial handshake response from "+peer+"\n"+hs); + if(hs.getConnectionType()==ConnectionHandshake.CONNECTION_SERVER_ACK){ + try{ + //TODO validate parameters sent by peer + long peerSocketID=hs.getSocketID(); + sessionCookie=hs.getCookie(); + destination.setSocketID(peerSocketID); + setState(handshaking+1); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + return; + } + else{ + logger.info("Unexpected type of handshake packet received"); + setState(invalid); + } + } + else if(getState()==handshaking+1){ + try{ + logger.info("Received confirmation handshake response from "+peer+"\n"+hs); + //TODO validate parameters sent by peer + setState(ready); + socket=new UDTSocket(endPoint,this); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + } + } + + //initial handshake for connect + protected void sendInitialHandShake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); @@ -153,20 +171,24 @@ handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); + handshake.setAddress(endPoint.getLocalAddress()); logger.info("Sending "+handshake); endPoint.doSend(handshake); } //2nd handshake for connect - protected void sendConfirmation(ConnectionHandshake hs)throws IOException{ + protected void sendSecondHandshake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); - handshake.setConnectionType(-1); + handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); - handshake.setInitialSeqNo(hs.getInitialSeqNo()); - handshake.setPacketSize(hs.getPacketSize()); + handshake.setInitialSeqNo(initialSequenceNo); + handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); + handshake.setCookie(sessionCookie); + handshake.setAddress(endPoint.getLocalAddress()); + handshake.setDestinationID(getDestination().getSocketID()); logger.info("Sending confirmation "+handshake); endPoint.doSend(handshake); } Modified: udt-java/trunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ServerSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/ServerSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -33,7 +33,6 @@ package udt; import java.io.IOException; -import java.net.DatagramPacket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.logging.Level; @@ -43,6 +42,7 @@ import udt.packets.Destination; import udt.packets.KeepAlive; import udt.packets.Shutdown; +import udt.util.SequenceNumber; /** * server side session in client-server mode @@ -56,10 +56,10 @@ //last received packet (for testing purposes) private UDTPacket lastPacket; - public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ - super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort())); + public ServerSession(Destination peer, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ + super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+peer.getAddress()+":"+peer.getPort(),peer); this.endPoint=endPoint; - logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); + logger.info("Created "+toString()+" talking to "+peer.getAddress()+":"+peer.getPort()); } int n_handshake=0; @@ -68,79 +68,45 @@ public void received(UDTPacket packet, Destination peer){ lastPacket=packet; - if(packet instanceof ConnectionHandshake) { - ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet; - logger.info("Received "+connectionHandshake); + if(packet.isConnectionHandshake()) { + handleHandShake((ConnectionHandshake)packet); + return; + } - if (getState()<=ready){ - destination.setSocketID(connectionHandshake.getSocketID()); - - if(getState()<=handshaking){ - setState(handshaking); - } - try{ - handleHandShake(connectionHandshake); - n_handshake++; - try{ - setState(ready); - socket=new UDTSocket(endPoint, this); - cc.init(); - }catch(Exception uhe){ - //session is invalid - logger.log(Level.SEVERE,"",uhe); - setState(invalid); - } - }catch(IOException ex){ - //session invalid - logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); - setState(invalid); - } - return; - } - - }else if(packet instanceof KeepAlive) { + if(packet instanceof KeepAlive) { socket.getReceiver().resetEXPTimer(); active = true; return; } - if(getState()== ready) { - active = true; - - if (packet instanceof KeepAlive) { - //nothing to do here - return; - }else if (packet instanceof Shutdown) { - try{ - socket.getReceiver().stop(); - }catch(IOException ex){ - logger.log(Level.WARNING,"",ex); - } - setState(shutdown); - System.out.println("SHUTDOWN ***"); - active = false; - logger.info("Connection shutdown initiated by the other side."); - return; + if (packet instanceof Shutdown) { + try{ + socket.getReceiver().stop(); + }catch(IOException ex){ + logger.log(Level.WARNING,"",ex); } + setState(shutdown); + active = false; + logger.info("Connection shutdown initiated by peer."); + return; + } - else{ - try{ - if(packet.forSender()){ - socket.getSender().receive(packet); - }else{ - socket.getReceiver().receive(packet); - } - }catch(Exception ex){ - //session invalid - logger.log(Level.SEVERE,"",ex); - setState(invalid); + if(getState() == ready) { + active = true; + try{ + if(packet.forSender()){ + socket.getSender().receive(packet); + }else{ + socket.getReceiver().receive(packet); } + }catch(Exception ex){ + //session invalid + logger.log(Level.SEVERE,"",ex); + setState(invalid); } return; - } - } /** @@ -151,16 +117,73 @@ } /** - * handle the connection handshake:<br/> - * <ul> - * <li>set initial sequence number</li> - * <li>send response handshake</li> - * </ul> + * reply to a connection handshake message + * @param connectionHandshake + */ + protected void handleHandShake(ConnectionHandshake connectionHandshake){ + logger.info("Received "+connectionHandshake + " in state <"+getState()+">"); + if(getState()==ready){ + //just send confirmation packet again + try{ + sendFinalHandShake(connectionHandshake); + }catch(IOException io){} + return; + } + + if (getState()<ready){ + destination.setSocketID(connectionHandshake.getSocketID()); + + if(getState()<handshaking){ + setState(handshaking); + } + + try{ + n_handshake++; + boolean handShakeComplete=handleSecondHandShake(connectionHandshake); + if(handShakeComplete){ + logger.info("Client/Server handshake complete!"); + setState(ready); + socket=new UDTSocket(endPoint, this); + cc.init(); + } + }catch(IOException ex){ + //session invalid + logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); + setState(invalid); + } + } + } + + private ConnectionHandshake finalConnectionHandshake; + + /** + * handle the connection handshake + * * @param handshake * @param peer * @throws IOException */ - protected void handleHandShake(ConnectionHandshake handshake)throws IOException{ + protected boolean handleSecondHandShake(ConnectionHandshake handshake)throws IOException{ + if(sessionCookie==0){ + ackInitialHandshake(handshake); + //need one more handshake + return false; + } + + long otherCookie=handshake.getCookie(); + if(sessionCookie!=otherCookie){ + setState(invalid); + throw new IOException("Invalid cookie <"+otherCookie+"> received, my cookie is <"+sessionCookie+">"); + } + sendFinalHandShake(handshake); + return true; + } + + /* + * response after the initial connection handshake received: + * compute cookie + */ + protected void ackInitialHandshake(ConnectionHandshake handshake)throws IOException{ ConnectionHandshake responseHandshake = new ConnectionHandshake(); //compare the packet size and choose minimun long clientBufferSize=handshake.getPacketSize(); @@ -178,12 +201,40 @@ responseHandshake.setSocketID(mySocketID); responseHandshake.setDestinationID(this.getDestination().getSocketID()); responseHandshake.setSession(this); + sessionCookie=SequenceNumber.random(); + responseHandshake.setCookie(sessionCookie); + responseHandshake.setAddress(endPoint.getLocalAddress()); logger.info("Sending reply "+responseHandshake); endPoint.doSend(responseHandshake); } + protected void sendFinalHandShake(ConnectionHandshake handshake)throws IOException{ + if(finalConnectionHandshake==null){ + finalConnectionHandshake= new ConnectionHandshake(); + //compare the packet size and choose minimun + long clientBufferSize=handshake.getPacketSize(); + long myBufferSize=getDatagramSize(); + long bufferSize=Math.min(clientBufferSize, myBufferSize); + long initialSequenceNumber=handshake.getInitialSeqNo(); + setInitialSequenceNumber(initialSequenceNumber); + setDatagramSize((int)bufferSize); + finalConnectionHandshake.setPacketSize(bufferSize); + finalConnectionHandshake.setUdtVersion(4); + finalConnectionHandshake.setInitialSeqNo(initialSequenceNumber); + finalConnectionHandshake.setConnectionType(-1); + finalConnectionHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize()); + //tell peer what the socket ID on this side is + finalConnectionHandshake.setSocketID(mySocketID); + finalConnectionHandshake.setDestinationID(this.getDestination().getSocketID()); + finalConnectionHandshake.setSession(this); + finalConnectionHandshake.setCookie(sessionCookie); + finalConnectionHandshake.setAddress(endPoint.getLocalAddress()); + } + logger.info("Sending final handshake ack "+finalConnectionHandshake); + endPoint.doSend(finalConnectionHandshake); + } } Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-05-25 08:03:03 UTC (rev 74) @@ -40,6 +40,8 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; @@ -70,10 +72,12 @@ //last received packet private UDTPacket lastPacket; + private final Map<Destination,UDTSession> sessionsBeingConnected=Collections.synchronizedMap(new HashMap<Destination,UDTSession>()); + //if the endpoint is configured for a server socket, //this queue is used to handoff new UDTSessions to the application private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); - + private boolean serverSocketMode=false; //has the endpoint been stopped? @@ -90,7 +94,7 @@ this.dgSocket=socket; port=dgSocket.getLocalPort(); } - + /** * bind to any local port on the given host address * @param localAddress @@ -116,7 +120,7 @@ } if(localPort>0)this.port = localPort; else port=dgSocket.getLocalPort(); - + configureSocket(); } @@ -127,7 +131,7 @@ dgSocket.setReceiveBufferSize(128*1024); dgSocket.setReuseAddress(false); } - + /** * bind to the default network interface on the machine * @@ -240,79 +244,75 @@ * </ul> * @throws IOException */ - private long lastDestID=-1; - private UDTSession lastSession; - - private int n=0; - - private final Object lock=new Object(); - protected void doReceive()throws IOException{ while(!stopped){ try{ - try{ - - //will block until a packet is received or timeout has expired - dgSocket.receive(dp); - - Destination peer=new Destination(dp.getAddress(), dp.getPort()); - int l=dp.getLength(); - UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); - lastPacket=packet; + //will block until a packet is received or timeout has expired + dgSocket.receive(dp); - //handle connection handshake - if(packet.isConnectionHandshake()){ - synchronized(lock){ - Long id=Long.valueOf(packet.getDestinationID()); - UDTSession session=sessions.get(id); - if(session==null){ - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); - sessionHandoff.put(session); - logger.fine("Request taken for processing."); - } - } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); - } - } - else{ - //dispatch to existing session - long dest=packet.getDestinationID(); - UDTSession session; - if(dest==lastDestID){ - session=lastSession; - } - else{ - session=sessions.get(dest); - lastSession=session; - lastDestID=dest; - } - if(session==null){ - n++; - if(n%100==1){ - logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); - } - } - else{ - session.received(packet,peer); - } - } - }catch(SocketException ex){ - logger.log(Level.INFO, "SocketException: "+ex.getMessage()); - }catch(SocketTimeoutException ste){ - //can safely ignore... we will retry until the endpoint is stopped + Destination peer=new Destination(dp.getAddress(), dp.getPort()); + int l=dp.getLength(); + UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); + lastPacket=packet; + + long dest=packet.getDestinationID(); + UDTSession session=sessions.get(dest); + if(session!=null){ + //dispatch to existing session + session.received(packet,peer); } - + else if(packet.isConnectionHandshake()){ + connectionHandshake((ConnectionHandshake)packet, peer); + } + else{ + logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + } + }catch(SocketException ex){ + logger.log(Level.INFO, "SocketException: "+ex.getMessage()); + }catch(SocketTimeoutException ste){ + //can safely ignore... we will retry until the endpoint is stopped }catch(Exception ex){ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } } } + /** + * called when a "connection handshake" packet was received and no + * matching session yet exists + * + * @param packet + * @param peer + * @throws IOException + * @throws InterruptedException + */ + protected synchronized void connectionHandshake(ConnectionHandshake packet, Destination peer)throws IOException, InterruptedException{ + Destination p=new Destination(peer.getAddress(),peer.getPort()); + UDTSession session=sessionsBeingConnected.get(peer); + long destID=packet.getDestinationID(); + if(session!=null && session.getSocketID()==destID){ + //confirmation handshake + sessionsBeingConnected.remove(p); + addSession(destID, session); + } + else if(session==null){ + session=new ServerSession(peer,this); + sessionsBeingConnected.put(p,session); + sessions.put(session.getSocketID(), session); + if(serverSocketMode){ + logger.fine("Pooling new request."); + sessionHandoff.put(session); + logger.fine("Request taken for processing."); + } + } + else { + throw new IOException("dest ID sent by client does not match"); + } + Long peerSocketID=((ConnectionHandshake)packet).getSocketID(); + peer.setSocketID(peerSocketID); + session.received(packet,peer); + } + protected void doSend(UDTPacket packet)throws IOException{ byte[]data=packet.getEncoded(); DatagramPacket dgp = packet.getSession().getDatagram(); @@ -327,4 +327,5 @@ public void sendRaw(DatagramPacket p)throws IOException{ dgSocket.send(p); } + } Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2012-05-25 08:03:03 UTC (rev 74) @@ -80,12 +80,11 @@ //create client session... clientSession=new ClientSession(clientEndpoint,destination); clientEndpoint.addSession(clientSession.getSocketID(), clientSession); - clientEndpoint.start(); clientSession.connect(); //wait for handshake while(!clientSession.isReady()){ - Thread.sleep(5); + Thread.sleep(50); } logger.info("The UDTClient is connected"); } Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74) @@ -78,9 +78,19 @@ @Override public int read()throws IOException{ int b=0; - while(b==0) + while(b==0){ b=read(single); - + if(b==0){ + try{ + while(receiveBuffer.isEmpty()){ + Thread.sleep(20); + } + }catch(InterruptedException ie){ + throw new IOException(ie); + } + } + } + if(b>0){ return single[0] & 0xFF; } @@ -153,9 +163,7 @@ else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ - IOException ex=new IOException(); - ex.initCause(ie); - throw ex; + throw new IOException(ie); } return; } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-05-25 08:03:03 UTC (rev 74) @@ -196,9 +196,14 @@ //starts the sender algorithm private void start(){ + Runnable r=new Runnable(){ public void run(){ try{ + while(session.getSocket()==null)Thread.sleep(100); + session.getSocket().getInputStream(); + + logger.info("STARTING RECEIVER for "+session); nextACK=Util.getCurrentTime()+ackTimerInterval; nextNAK=(long)(Util.getCurrentTime()+1.5*nakTimerInterval); nextEXP=Util.getCurrentTime()+2*expTimerInterval; @@ -224,6 +229,9 @@ */ protected void receive(UDTPacket p)throws IOException{ if(storeStatistics)dgReceiveInterval.end(); + if(!p.isControlPacket()){ + System.out.println("++ "+p+" queuesize="+handoffQueue.size()); + } handoffQueue.offer(p); if(storeStatistics)dgReceiveInterval.begin(); } @@ -265,6 +273,7 @@ needEXPReset=true; } } + if(needEXPReset){ nextEXP=Util.getCurrentTime()+expTimerInterval; } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-05-25 08:03:03 UTC (rev 74) @@ -151,7 +151,7 @@ * start the sender thread */ public void start(){ - logger.info("Starting sender for "+session); + logger.info("STARTING SENDER for "+session); startLatch.countDown(); started=true; } Modified: udt-java/trunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-05-25 08:03:03 UTC (rev 74) @@ -31,6 +31,7 @@ *********************************************************************************/ package udt; + import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; @@ -38,8 +39,8 @@ import java.util.logging.Logger; - public class UDTServerSocket { + private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); private final UDPEndPoint endpoint; Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -39,6 +39,7 @@ import java.util.logging.Logger; import udt.packets.Destination; +import udt.util.SequenceNumber; import udt.util.UDTStatistics; public abstract class UDTSession { @@ -53,9 +54,9 @@ //state constants public static final int start=0; public static final int handshaking=1; - public static final int ready=2; - public static final int keepalive=3; - public static final int shutdown=4; + public static final int ready=50; + public static final int keepalive=80; + public static final int shutdown=90; public static final int invalid=99; @@ -70,6 +71,9 @@ //cache dgPacket (peer stays the same always) private DatagramPacket dgPacket; + //session cookie created during handshake + protected long sessionCookie=0; + /** * flow window size, i.e. how many data packets are * in-flight at a single time @@ -209,7 +213,7 @@ public synchronized long getInitialSequenceNumber(){ if(initialSequenceNumber==null){ - initialSequenceNumber=1l; //TODO must be random? + initialSequenceNumber=SequenceNumber.random(); } return initialSequenceNumber; } Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-05-25 08:03:03 UTC (rev 74) @@ -33,6 +33,8 @@ package udt.packets; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; import udt.UDTSession; @@ -49,21 +51,29 @@ private long packetSize; private long maxFlowWndSize; - public static final long CONNECTION_TYPE_REGULAR=1; + public static final long CONNECTION_TYPE_REGULAR=1L; - public static final long CONNECTION_TYPE_RENDEZVOUS=0; + public static final long CONNECTION_TYPE_RENDEZVOUS=0L; + /** + * connection type in response handshake packet + */ + public static final long CONNECTION_SERVER_ACK=-1L; + private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode private long socketID; private long cookie=0; + //address of the UDP socket + private InetAddress address; + public ConnectionHandshake(){ this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal(); } - public ConnectionHandshake(byte[]controlInformation){ + public ConnectionHandshake(byte[]controlInformation)throws IOException{ this(); decode(controlInformation); } @@ -73,7 +83,7 @@ return true; } - void decode(byte[]data){ + void decode(byte[]data)throws IOException{ udtVersion =PacketUtil.decode(data, 0); socketType=PacketUtil.decode(data, 4); initialSeqNo=PacketUtil.decode(data, 8); @@ -81,9 +91,9 @@ maxFlowWndSize=PacketUtil.decode(data, 16); connectionType=PacketUtil.decode(data, 20); socketID=PacketUtil.decode(data, 24); - if(data.length>28){ - cookie=PacketUtil.decode(data, 28); - } + cookie=PacketUtil.decode(data, 28); + //TODO ipv6 check + address=PacketUtil.decodeInetAddress(data, 32, false); } public long getUdtVersion() { @@ -134,11 +144,25 @@ public void setSocketID(long socketID) { this.socketID = socketID; } + public long getCookie() { + return cookie; + } + public void setCookie(long cookie) { + this.cookie = cookie; + } + public InetAddress getAddress() { + return address; + } + + public void setAddress(InetAddress address) { + this.address = address; + } + @Override public byte[] encodeControlInformation(){ try { - ByteArrayOutputStream bos=new ByteArrayOutputStream(24); + ByteArrayOutputStream bos=new ByteArrayOutputStream(48); bos.write(PacketUtil.encode(udtVersion)); bos.write(PacketUtil.encode(socketType)); bos.write(PacketUtil.encode(initialSeqNo)); @@ -146,6 +170,8 @@ bos.write(PacketUtil.encode(maxFlowWndSize)); bos.write(PacketUtil.encode(connectionType)); bos.write(PacketUtil.encode(socketID)); + bos.write(PacketUtil.encode(cookie)); + bos.write(PacketUtil.encode(address)); return bos.toByteArray(); } catch (Exception e) { // can't happen @@ -178,6 +204,10 @@ return false; if (udtVersion != other.udtVersion) return false; + if (cookie!=other.cookie) + return false; + if (!address.equals(other.address)) + return false; return true; } @@ -198,6 +228,7 @@ sb.append(", socketType=").append(socketType); sb.append(", destSocketID=").append(destinationID); if(cookie>0)sb.append(", cookie=").append(cookie); + sb.append(", address=").append(address); sb.append("]"); return sb.toString(); } Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-05-25 08:03:03 UTC (rev 74) @@ -31,6 +31,8 @@ *********************************************************************************/ package udt.packets; +import java.io.IOException; + import udt.UDTPacket; import udt.packets.ControlPacket.*; @@ -42,13 +44,13 @@ * @param packetData * @return */ - public static UDTPacket createPacket(byte[]encodedData){ + public static UDTPacket createPacket(byte[]encodedData)throws IOException{ boolean isControl=(encodedData[0]&128) !=0 ; if(isControl)return createControlPacket(encodedData,encodedData.length); return new DataPacket(encodedData); } - public static UDTPacket createPacket(byte[]encodedData,int length){ + public static UDTPacket createPacket(byte[]encodedData,int length)throws IOException{ boolean isControl=(encodedData[0]&128) !=0 ; if(isControl)return createControlPacket(encodedData,length); return new DataPacket(encodedData,length); @@ -59,7 +61,7 @@ * @param packetData * @return */ - public static ControlPacket createControlPacket(byte[]encodedData,int length){ + public static ControlPacket createControlPacket(byte[]encodedData,int length)throws IOException{ ControlPacket packet=null; Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-05-25 08:03:03 UTC (rev 74) @@ -32,7 +32,10 @@ package udt.packets; +import java.net.InetAddress; +import java.net.UnknownHostException; + public class PacketUtil { public static byte[]encode(long value){ @@ -80,4 +83,30 @@ return result; } + /** + * encodes the specified address into 128 bit + * @param address - inet address + */ + public static byte[] encode(InetAddress address){ + byte[]res=new byte[16]; + byte[]add=address.getAddress(); + System.arraycopy(add, 0, res, 0, add.length); + return res; + } + + public static InetAddress decodeInetAddress(byte[]data, int start, boolean ipV6)throws UnknownHostException{ + InetAddress result=null; + byte[] add=ipV6?new byte[16]:new byte[4]; + System.arraycopy(data, start, add, 0, add.length); + result=InetAddress.getByAddress(add); + return result; + } + + public static void print(byte[]arr){ + System.out.print("["); + for(byte b: arr){ + System.out.print(" "+(b&0xFF)); + } + System.out.println(" ]"); + } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-05-25 08:03:03 UTC (rev 74) @@ -54,7 +54,9 @@ try{ long seq=data.getSequenceNumber(); //if already have this chunk, discard it - if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true; + if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0){ + return true; + } //else compute insert position int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq); int insert=offset% size; Modified: udt-java/trunk/src/main/java/udt/util/SequenceNumber.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-05-25 08:03:03 UTC (rev 74) @@ -13,7 +13,7 @@ private final static long maxSequenceNo=0x7FFFFFFF; - + private final static Random rand=new Random(); /** * compare seq1 and seq2. Returns zero, if they are equal, a negative value if seq1 is smaller than * seq2, and a positive value if seq1 is larger than seq2. @@ -67,7 +67,7 @@ * generates a random number between 1 and 0x3FFFFFFF (inclusive) */ public static long random(){ - return 1+new Random().nextInt(maxOffset); + return 1+rand.nextInt(maxOffset); } } Modified: udt-java/trunk/src/test/java/echo/EchoServer.java =================================================================== --- udt-java/trunk/src/test/java/echo/EchoServer.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/echo/EchoServer.java 2012-05-25 08:03:03 UTC (rev 74) @@ -78,7 +78,7 @@ String line=readLine(in); if(line!=null){ System.out.println("ECHO: "+line); - //else echo back the line + //echo back the line writer.println(line); writer.flush(); } Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74) @@ -101,7 +101,6 @@ try{ for(int i=0;i<blocks.length;i++){ while(!is.haveNewData(i+1, blocks[i])){ - Thread.yield(); Thread.sleep(100); } } Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-05-25 08:03:03 UTC (rev 74) @@ -3,17 +3,20 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import org.junit.Test; import udt.UDTPacket; +import udt.util.SequenceNumber; public class TestPacketFactory { @Test - public void testData(){ + public void testData()throws IOException{ String test="sdjfsdjfldskjflds"; byte[]data=test.getBytes(); @@ -26,7 +29,7 @@ } @Test - public void testConnectionHandshake(){ + public void testConnectionHandshake()throws IOException{ ConnectionHandshake p1 = new ConnectionHandshake(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); @@ -39,8 +42,9 @@ p1.setMaxFlowWndSize(128); p1.setSocketID(1); p1.setUdtVersion(4); - - + p1.setAddress(InetAddress.getLocalHost()); + p1.setCookie(SequenceNumber.random()); + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); @@ -50,7 +54,7 @@ } @Test - public void testAcknowledgement(){ + public void testAcknowledgement()throws IOException{ Acknowledgement p1 = new Acknowledgement(); p1.setAckSequenceNumber(1234); p1.setMessageNumber(9876); @@ -70,7 +74,7 @@ } @Test - public void testAcknowledgementOfAcknowledgement(){ + public void testAcknowledgementOfAcknowledgement()throws IOException{ Acknowledgment2 p1 = new Acknowledgment2(); p1.setAckSequenceNumber(1230); p1.setMessageNumber(9871); @@ -86,7 +90,7 @@ } @Test - public void testNegativeAcknowledgement(){ + public void testNegativeAcknowledgement()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -105,7 +109,7 @@ } @Test - public void testNegativeAcknowledgement2(){ + public void testNegativeAcknowledgement2()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -130,7 +134,7 @@ } @Test - public void testNegativeAcknowledgement3(){ + public void testNegativeAcknowledgement3()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -148,13 +152,12 @@ } @Test - public void testShutdown(){ + public void testShutdown()throws IOException{ Shutdown p1 = new Shutdown(); p1.setMessageNumber(9874); p1.setTimeStamp(3453); p1.setDestinationID(3); - byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); @@ -164,7 +167,7 @@ @Test - public void testMessageDropRequest(){ + public void testMessageDropRequest()throws Exception{ MessageDropRequest p1=new MessageDropRequest(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); @@ -181,5 +184,15 @@ MessageDropRequest p2=(MessageDropRequest)p; assertEquals(p1,p2); } + + @Test + public void testPacketUtil()throws Exception{ + InetAddress i=InetAddress.getLocalHost(); + byte[]enc=PacketUtil.encode(i); + PacketUtil.print(enc); + InetAddress i2=PacketUtil.decodeInetAddress(enc, 0, false); + System.out.println(i2); + assertEquals(i, i2); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |