udt-java-commits Mailing List for UDT-Java
Status: Alpha
Brought to you by:
bschuller
You can subscribe to this list here.
2010 |
Jan
|
Feb
|
Mar
|
Apr
(17) |
May
(4) |
Jun
(1) |
Jul
|
Aug
(3) |
Sep
(7) |
Oct
|
Nov
(1) |
Dec
|
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(1) |
Feb
(1) |
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
(10) |
Sep
|
Oct
(1) |
Nov
(1) |
Dec
(2) |
2012 |
Jan
|
Feb
(4) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <bsc...@us...> - 2012-05-25 08:03:15
|
Revision: 74 http://udt-java.svn.sourceforge.net/udt-java/?rev=74&view=rev Author: bschuller Date: 2012-05-25 08:03:03 +0000 (Fri, 25 May 2012) Log Message: ----------- attempt to switch to new UDT-C++ style of connection handshake. DOES NOT WORK yet Modified Paths: -------------- udt-java/trunk/pom.xml udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/ServerSession.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTInputStream.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTServerSocket.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/PacketUtil.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/SequenceNumber.java udt-java/trunk/src/test/java/echo/EchoServer.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/pom.xml 2012-05-25 08:03:03 UTC (rev 74) @@ -5,7 +5,7 @@ <artifactId>udt-java</artifactId> <packaging>jar</packaging> <name>UDT Java implementation</name> - <version>0.6-SNAPSHOT</version> + <version>0.7-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> <developers> <developer> Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -52,6 +52,8 @@ private UDPEndPoint endPoint; + long initialSequenceNo=SequenceNumber.random(); + public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; @@ -60,7 +62,7 @@ /** * send connection handshake until a reply from server is received - * TODO check for timeout + * @throws InterruptedException * @throws IOException */ @@ -68,11 +70,21 @@ public void connect() throws InterruptedException,IOException{ int n=0; while(getState()!=ready){ - sendHandShake(); if(getState()==invalid)throw new IOException("Can't connect!"); - n++; - if(getState()!=ready)Thread.sleep(500); + if(getState()<=handshaking){ + setState(handshaking); + sendInitialHandShake(); + } + else if(getState()==handshaking+1){ + sendSecondHandshake(); + } + + if(getState()==invalid)throw new IOException("Can't connect!"); + if(n++ > 10)throw new IOException("Could not connect to server within the timeout."); + + Thread.sleep(500); } + Thread.sleep(1000); cc.init(); logger.info("Connected, "+n+" handshake packets sent"); } @@ -82,38 +94,10 @@ lastPacket=packet; - if (packet instanceof ConnectionHandshake) { + if (packet.isConnectionHandshake()) { ConnectionHandshake hs=(ConnectionHandshake)packet; - - logger.info("Received connection handshake from "+peer+"\n"+hs); - - if (getState()!=ready) { - if(hs.getConnectionType()==1){ - try{ - //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); - destination.setSocketID(peerSocketID); - sendConfirmation(hs); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); - } - return; - } - else{ - try{ - //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); - destination.setSocketID(peerSocketID); - setState(ready); - socket=new UDTSocket(endPoint,this); - }catch(Exception ex){ - logger.log(Level.WARNING,"Error creating socket",ex); - setState(invalid); - } - return; - } - } + handleConnectionHandshake(hs,peer); + return; } if(getState() == ready) { @@ -140,9 +124,43 @@ } } + protected void handleConnectionHandshake(ConnectionHandshake hs, Destination peer){ - //handshake for connect - protected void sendHandShake()throws IOException{ + if (getState()==handshaking) { + logger.info("Received initial handshake response from "+peer+"\n"+hs); + if(hs.getConnectionType()==ConnectionHandshake.CONNECTION_SERVER_ACK){ + try{ + //TODO validate parameters sent by peer + long peerSocketID=hs.getSocketID(); + sessionCookie=hs.getCookie(); + destination.setSocketID(peerSocketID); + setState(handshaking+1); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + return; + } + else{ + logger.info("Unexpected type of handshake packet received"); + setState(invalid); + } + } + else if(getState()==handshaking+1){ + try{ + logger.info("Received confirmation handshake response from "+peer+"\n"+hs); + //TODO validate parameters sent by peer + setState(ready); + socket=new UDTSocket(endPoint,this); + }catch(Exception ex){ + logger.log(Level.WARNING,"Error creating socket",ex); + setState(invalid); + } + } + } + + //initial handshake for connect + protected void sendInitialHandShake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); @@ -153,20 +171,24 @@ handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); + handshake.setAddress(endPoint.getLocalAddress()); logger.info("Sending "+handshake); endPoint.doSend(handshake); } //2nd handshake for connect - protected void sendConfirmation(ConnectionHandshake hs)throws IOException{ + protected void sendSecondHandshake()throws IOException{ ConnectionHandshake handshake = new ConnectionHandshake(); - handshake.setConnectionType(-1); + handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); - handshake.setInitialSeqNo(hs.getInitialSeqNo()); - handshake.setPacketSize(hs.getPacketSize()); + handshake.setInitialSeqNo(initialSequenceNo); + handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); handshake.setSession(this); + handshake.setCookie(sessionCookie); + handshake.setAddress(endPoint.getLocalAddress()); + handshake.setDestinationID(getDestination().getSocketID()); logger.info("Sending confirmation "+handshake); endPoint.doSend(handshake); } Modified: udt-java/trunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ServerSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/ServerSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -33,7 +33,6 @@ package udt; import java.io.IOException; -import java.net.DatagramPacket; import java.net.SocketException; import java.net.UnknownHostException; import java.util.logging.Level; @@ -43,6 +42,7 @@ import udt.packets.Destination; import udt.packets.KeepAlive; import udt.packets.Shutdown; +import udt.util.SequenceNumber; /** * server side session in client-server mode @@ -56,10 +56,10 @@ //last received packet (for testing purposes) private UDTPacket lastPacket; - public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ - super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort())); + public ServerSession(Destination peer, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ + super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+peer.getAddress()+":"+peer.getPort(),peer); this.endPoint=endPoint; - logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); + logger.info("Created "+toString()+" talking to "+peer.getAddress()+":"+peer.getPort()); } int n_handshake=0; @@ -68,79 +68,45 @@ public void received(UDTPacket packet, Destination peer){ lastPacket=packet; - if(packet instanceof ConnectionHandshake) { - ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet; - logger.info("Received "+connectionHandshake); + if(packet.isConnectionHandshake()) { + handleHandShake((ConnectionHandshake)packet); + return; + } - if (getState()<=ready){ - destination.setSocketID(connectionHandshake.getSocketID()); - - if(getState()<=handshaking){ - setState(handshaking); - } - try{ - handleHandShake(connectionHandshake); - n_handshake++; - try{ - setState(ready); - socket=new UDTSocket(endPoint, this); - cc.init(); - }catch(Exception uhe){ - //session is invalid - logger.log(Level.SEVERE,"",uhe); - setState(invalid); - } - }catch(IOException ex){ - //session invalid - logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); - setState(invalid); - } - return; - } - - }else if(packet instanceof KeepAlive) { + if(packet instanceof KeepAlive) { socket.getReceiver().resetEXPTimer(); active = true; return; } - if(getState()== ready) { - active = true; - - if (packet instanceof KeepAlive) { - //nothing to do here - return; - }else if (packet instanceof Shutdown) { - try{ - socket.getReceiver().stop(); - }catch(IOException ex){ - logger.log(Level.WARNING,"",ex); - } - setState(shutdown); - System.out.println("SHUTDOWN ***"); - active = false; - logger.info("Connection shutdown initiated by the other side."); - return; + if (packet instanceof Shutdown) { + try{ + socket.getReceiver().stop(); + }catch(IOException ex){ + logger.log(Level.WARNING,"",ex); } + setState(shutdown); + active = false; + logger.info("Connection shutdown initiated by peer."); + return; + } - else{ - try{ - if(packet.forSender()){ - socket.getSender().receive(packet); - }else{ - socket.getReceiver().receive(packet); - } - }catch(Exception ex){ - //session invalid - logger.log(Level.SEVERE,"",ex); - setState(invalid); + if(getState() == ready) { + active = true; + try{ + if(packet.forSender()){ + socket.getSender().receive(packet); + }else{ + socket.getReceiver().receive(packet); } + }catch(Exception ex){ + //session invalid + logger.log(Level.SEVERE,"",ex); + setState(invalid); } return; - } - } /** @@ -151,16 +117,73 @@ } /** - * handle the connection handshake:<br/> - * <ul> - * <li>set initial sequence number</li> - * <li>send response handshake</li> - * </ul> + * reply to a connection handshake message + * @param connectionHandshake + */ + protected void handleHandShake(ConnectionHandshake connectionHandshake){ + logger.info("Received "+connectionHandshake + " in state <"+getState()+">"); + if(getState()==ready){ + //just send confirmation packet again + try{ + sendFinalHandShake(connectionHandshake); + }catch(IOException io){} + return; + } + + if (getState()<ready){ + destination.setSocketID(connectionHandshake.getSocketID()); + + if(getState()<handshaking){ + setState(handshaking); + } + + try{ + n_handshake++; + boolean handShakeComplete=handleSecondHandShake(connectionHandshake); + if(handShakeComplete){ + logger.info("Client/Server handshake complete!"); + setState(ready); + socket=new UDTSocket(endPoint, this); + cc.init(); + } + }catch(IOException ex){ + //session invalid + logger.log(Level.WARNING,"Error processing ConnectionHandshake",ex); + setState(invalid); + } + } + } + + private ConnectionHandshake finalConnectionHandshake; + + /** + * handle the connection handshake + * * @param handshake * @param peer * @throws IOException */ - protected void handleHandShake(ConnectionHandshake handshake)throws IOException{ + protected boolean handleSecondHandShake(ConnectionHandshake handshake)throws IOException{ + if(sessionCookie==0){ + ackInitialHandshake(handshake); + //need one more handshake + return false; + } + + long otherCookie=handshake.getCookie(); + if(sessionCookie!=otherCookie){ + setState(invalid); + throw new IOException("Invalid cookie <"+otherCookie+"> received, my cookie is <"+sessionCookie+">"); + } + sendFinalHandShake(handshake); + return true; + } + + /* + * response after the initial connection handshake received: + * compute cookie + */ + protected void ackInitialHandshake(ConnectionHandshake handshake)throws IOException{ ConnectionHandshake responseHandshake = new ConnectionHandshake(); //compare the packet size and choose minimun long clientBufferSize=handshake.getPacketSize(); @@ -178,12 +201,40 @@ responseHandshake.setSocketID(mySocketID); responseHandshake.setDestinationID(this.getDestination().getSocketID()); responseHandshake.setSession(this); + sessionCookie=SequenceNumber.random(); + responseHandshake.setCookie(sessionCookie); + responseHandshake.setAddress(endPoint.getLocalAddress()); logger.info("Sending reply "+responseHandshake); endPoint.doSend(responseHandshake); } + protected void sendFinalHandShake(ConnectionHandshake handshake)throws IOException{ + if(finalConnectionHandshake==null){ + finalConnectionHandshake= new ConnectionHandshake(); + //compare the packet size and choose minimun + long clientBufferSize=handshake.getPacketSize(); + long myBufferSize=getDatagramSize(); + long bufferSize=Math.min(clientBufferSize, myBufferSize); + long initialSequenceNumber=handshake.getInitialSeqNo(); + setInitialSequenceNumber(initialSequenceNumber); + setDatagramSize((int)bufferSize); + finalConnectionHandshake.setPacketSize(bufferSize); + finalConnectionHandshake.setUdtVersion(4); + finalConnectionHandshake.setInitialSeqNo(initialSequenceNumber); + finalConnectionHandshake.setConnectionType(-1); + finalConnectionHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize()); + //tell peer what the socket ID on this side is + finalConnectionHandshake.setSocketID(mySocketID); + finalConnectionHandshake.setDestinationID(this.getDestination().getSocketID()); + finalConnectionHandshake.setSession(this); + finalConnectionHandshake.setCookie(sessionCookie); + finalConnectionHandshake.setAddress(endPoint.getLocalAddress()); + } + logger.info("Sending final handshake ack "+finalConnectionHandshake); + endPoint.doSend(finalConnectionHandshake); + } } Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2012-05-25 08:03:03 UTC (rev 74) @@ -40,6 +40,8 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; @@ -70,10 +72,12 @@ //last received packet private UDTPacket lastPacket; + private final Map<Destination,UDTSession> sessionsBeingConnected=Collections.synchronizedMap(new HashMap<Destination,UDTSession>()); + //if the endpoint is configured for a server socket, //this queue is used to handoff new UDTSessions to the application private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); - + private boolean serverSocketMode=false; //has the endpoint been stopped? @@ -90,7 +94,7 @@ this.dgSocket=socket; port=dgSocket.getLocalPort(); } - + /** * bind to any local port on the given host address * @param localAddress @@ -116,7 +120,7 @@ } if(localPort>0)this.port = localPort; else port=dgSocket.getLocalPort(); - + configureSocket(); } @@ -127,7 +131,7 @@ dgSocket.setReceiveBufferSize(128*1024); dgSocket.setReuseAddress(false); } - + /** * bind to the default network interface on the machine * @@ -240,79 +244,75 @@ * </ul> * @throws IOException */ - private long lastDestID=-1; - private UDTSession lastSession; - - private int n=0; - - private final Object lock=new Object(); - protected void doReceive()throws IOException{ while(!stopped){ try{ - try{ - - //will block until a packet is received or timeout has expired - dgSocket.receive(dp); - - Destination peer=new Destination(dp.getAddress(), dp.getPort()); - int l=dp.getLength(); - UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); - lastPacket=packet; + //will block until a packet is received or timeout has expired + dgSocket.receive(dp); - //handle connection handshake - if(packet.isConnectionHandshake()){ - synchronized(lock){ - Long id=Long.valueOf(packet.getDestinationID()); - UDTSession session=sessions.get(id); - if(session==null){ - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); - sessionHandoff.put(session); - logger.fine("Request taken for processing."); - } - } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); - } - } - else{ - //dispatch to existing session - long dest=packet.getDestinationID(); - UDTSession session; - if(dest==lastDestID){ - session=lastSession; - } - else{ - session=sessions.get(dest); - lastSession=session; - lastDestID=dest; - } - if(session==null){ - n++; - if(n%100==1){ - logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); - } - } - else{ - session.received(packet,peer); - } - } - }catch(SocketException ex){ - logger.log(Level.INFO, "SocketException: "+ex.getMessage()); - }catch(SocketTimeoutException ste){ - //can safely ignore... we will retry until the endpoint is stopped + Destination peer=new Destination(dp.getAddress(), dp.getPort()); + int l=dp.getLength(); + UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); + lastPacket=packet; + + long dest=packet.getDestinationID(); + UDTSession session=sessions.get(dest); + if(session!=null){ + //dispatch to existing session + session.received(packet,peer); } - + else if(packet.isConnectionHandshake()){ + connectionHandshake((ConnectionHandshake)packet, peer); + } + else{ + logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + } + }catch(SocketException ex){ + logger.log(Level.INFO, "SocketException: "+ex.getMessage()); + }catch(SocketTimeoutException ste){ + //can safely ignore... we will retry until the endpoint is stopped }catch(Exception ex){ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } } } + /** + * called when a "connection handshake" packet was received and no + * matching session yet exists + * + * @param packet + * @param peer + * @throws IOException + * @throws InterruptedException + */ + protected synchronized void connectionHandshake(ConnectionHandshake packet, Destination peer)throws IOException, InterruptedException{ + Destination p=new Destination(peer.getAddress(),peer.getPort()); + UDTSession session=sessionsBeingConnected.get(peer); + long destID=packet.getDestinationID(); + if(session!=null && session.getSocketID()==destID){ + //confirmation handshake + sessionsBeingConnected.remove(p); + addSession(destID, session); + } + else if(session==null){ + session=new ServerSession(peer,this); + sessionsBeingConnected.put(p,session); + sessions.put(session.getSocketID(), session); + if(serverSocketMode){ + logger.fine("Pooling new request."); + sessionHandoff.put(session); + logger.fine("Request taken for processing."); + } + } + else { + throw new IOException("dest ID sent by client does not match"); + } + Long peerSocketID=((ConnectionHandshake)packet).getSocketID(); + peer.setSocketID(peerSocketID); + session.received(packet,peer); + } + protected void doSend(UDTPacket packet)throws IOException{ byte[]data=packet.getEncoded(); DatagramPacket dgp = packet.getSession().getDatagram(); @@ -327,4 +327,5 @@ public void sendRaw(DatagramPacket p)throws IOException{ dgSocket.send(p); } + } Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2012-05-25 08:03:03 UTC (rev 74) @@ -80,12 +80,11 @@ //create client session... clientSession=new ClientSession(clientEndpoint,destination); clientEndpoint.addSession(clientSession.getSocketID(), clientSession); - clientEndpoint.start(); clientSession.connect(); //wait for handshake while(!clientSession.isReady()){ - Thread.sleep(5); + Thread.sleep(50); } logger.info("The UDTClient is connected"); } Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74) @@ -78,9 +78,19 @@ @Override public int read()throws IOException{ int b=0; - while(b==0) + while(b==0){ b=read(single); - + if(b==0){ + try{ + while(receiveBuffer.isEmpty()){ + Thread.sleep(20); + } + }catch(InterruptedException ie){ + throw new IOException(ie); + } + } + } + if(b>0){ return single[0] & 0xFF; } @@ -153,9 +163,7 @@ else currentChunk=receiveBuffer.poll(10, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ - IOException ex=new IOException(); - ex.initCause(ie); - throw ex; + throw new IOException(ie); } return; } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-05-25 08:03:03 UTC (rev 74) @@ -196,9 +196,14 @@ //starts the sender algorithm private void start(){ + Runnable r=new Runnable(){ public void run(){ try{ + while(session.getSocket()==null)Thread.sleep(100); + session.getSocket().getInputStream(); + + logger.info("STARTING RECEIVER for "+session); nextACK=Util.getCurrentTime()+ackTimerInterval; nextNAK=(long)(Util.getCurrentTime()+1.5*nakTimerInterval); nextEXP=Util.getCurrentTime()+2*expTimerInterval; @@ -224,6 +229,9 @@ */ protected void receive(UDTPacket p)throws IOException{ if(storeStatistics)dgReceiveInterval.end(); + if(!p.isControlPacket()){ + System.out.println("++ "+p+" queuesize="+handoffQueue.size()); + } handoffQueue.offer(p); if(storeStatistics)dgReceiveInterval.begin(); } @@ -265,6 +273,7 @@ needEXPReset=true; } } + if(needEXPReset){ nextEXP=Util.getCurrentTime()+expTimerInterval; } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-05-25 08:03:03 UTC (rev 74) @@ -151,7 +151,7 @@ * start the sender thread */ public void start(){ - logger.info("Starting sender for "+session); + logger.info("STARTING SENDER for "+session); startLatch.countDown(); started=true; } Modified: udt-java/trunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTServerSocket.java 2012-05-25 08:03:03 UTC (rev 74) @@ -31,6 +31,7 @@ *********************************************************************************/ package udt; + import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; @@ -38,8 +39,8 @@ import java.util.logging.Logger; - public class UDTServerSocket { + private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); private final UDPEndPoint endpoint; Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2012-05-25 08:03:03 UTC (rev 74) @@ -39,6 +39,7 @@ import java.util.logging.Logger; import udt.packets.Destination; +import udt.util.SequenceNumber; import udt.util.UDTStatistics; public abstract class UDTSession { @@ -53,9 +54,9 @@ //state constants public static final int start=0; public static final int handshaking=1; - public static final int ready=2; - public static final int keepalive=3; - public static final int shutdown=4; + public static final int ready=50; + public static final int keepalive=80; + public static final int shutdown=90; public static final int invalid=99; @@ -70,6 +71,9 @@ //cache dgPacket (peer stays the same always) private DatagramPacket dgPacket; + //session cookie created during handshake + protected long sessionCookie=0; + /** * flow window size, i.e. how many data packets are * in-flight at a single time @@ -209,7 +213,7 @@ public synchronized long getInitialSequenceNumber(){ if(initialSequenceNumber==null){ - initialSequenceNumber=1l; //TODO must be random? + initialSequenceNumber=SequenceNumber.random(); } return initialSequenceNumber; } Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2012-05-25 08:03:03 UTC (rev 74) @@ -33,6 +33,8 @@ package udt.packets; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; import udt.UDTSession; @@ -49,21 +51,29 @@ private long packetSize; private long maxFlowWndSize; - public static final long CONNECTION_TYPE_REGULAR=1; + public static final long CONNECTION_TYPE_REGULAR=1L; - public static final long CONNECTION_TYPE_RENDEZVOUS=0; + public static final long CONNECTION_TYPE_RENDEZVOUS=0L; + /** + * connection type in response handshake packet + */ + public static final long CONNECTION_SERVER_ACK=-1L; + private long connectionType = CONNECTION_TYPE_REGULAR;//regular or rendezvous mode private long socketID; private long cookie=0; + //address of the UDP socket + private InetAddress address; + public ConnectionHandshake(){ this.controlPacketType=ControlPacketType.CONNECTION_HANDSHAKE.ordinal(); } - public ConnectionHandshake(byte[]controlInformation){ + public ConnectionHandshake(byte[]controlInformation)throws IOException{ this(); decode(controlInformation); } @@ -73,7 +83,7 @@ return true; } - void decode(byte[]data){ + void decode(byte[]data)throws IOException{ udtVersion =PacketUtil.decode(data, 0); socketType=PacketUtil.decode(data, 4); initialSeqNo=PacketUtil.decode(data, 8); @@ -81,9 +91,9 @@ maxFlowWndSize=PacketUtil.decode(data, 16); connectionType=PacketUtil.decode(data, 20); socketID=PacketUtil.decode(data, 24); - if(data.length>28){ - cookie=PacketUtil.decode(data, 28); - } + cookie=PacketUtil.decode(data, 28); + //TODO ipv6 check + address=PacketUtil.decodeInetAddress(data, 32, false); } public long getUdtVersion() { @@ -134,11 +144,25 @@ public void setSocketID(long socketID) { this.socketID = socketID; } + public long getCookie() { + return cookie; + } + public void setCookie(long cookie) { + this.cookie = cookie; + } + public InetAddress getAddress() { + return address; + } + + public void setAddress(InetAddress address) { + this.address = address; + } + @Override public byte[] encodeControlInformation(){ try { - ByteArrayOutputStream bos=new ByteArrayOutputStream(24); + ByteArrayOutputStream bos=new ByteArrayOutputStream(48); bos.write(PacketUtil.encode(udtVersion)); bos.write(PacketUtil.encode(socketType)); bos.write(PacketUtil.encode(initialSeqNo)); @@ -146,6 +170,8 @@ bos.write(PacketUtil.encode(maxFlowWndSize)); bos.write(PacketUtil.encode(connectionType)); bos.write(PacketUtil.encode(socketID)); + bos.write(PacketUtil.encode(cookie)); + bos.write(PacketUtil.encode(address)); return bos.toByteArray(); } catch (Exception e) { // can't happen @@ -178,6 +204,10 @@ return false; if (udtVersion != other.udtVersion) return false; + if (cookie!=other.cookie) + return false; + if (!address.equals(other.address)) + return false; return true; } @@ -198,6 +228,7 @@ sb.append(", socketType=").append(socketType); sb.append(", destSocketID=").append(destinationID); if(cookie>0)sb.append(", cookie=").append(cookie); + sb.append(", address=").append(address); sb.append("]"); return sb.toString(); } Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2012-05-25 08:03:03 UTC (rev 74) @@ -31,6 +31,8 @@ *********************************************************************************/ package udt.packets; +import java.io.IOException; + import udt.UDTPacket; import udt.packets.ControlPacket.*; @@ -42,13 +44,13 @@ * @param packetData * @return */ - public static UDTPacket createPacket(byte[]encodedData){ + public static UDTPacket createPacket(byte[]encodedData)throws IOException{ boolean isControl=(encodedData[0]&128) !=0 ; if(isControl)return createControlPacket(encodedData,encodedData.length); return new DataPacket(encodedData); } - public static UDTPacket createPacket(byte[]encodedData,int length){ + public static UDTPacket createPacket(byte[]encodedData,int length)throws IOException{ boolean isControl=(encodedData[0]&128) !=0 ; if(isControl)return createControlPacket(encodedData,length); return new DataPacket(encodedData,length); @@ -59,7 +61,7 @@ * @param packetData * @return */ - public static ControlPacket createControlPacket(byte[]encodedData,int length){ + public static ControlPacket createControlPacket(byte[]encodedData,int length)throws IOException{ ControlPacket packet=null; Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2012-05-25 08:03:03 UTC (rev 74) @@ -32,7 +32,10 @@ package udt.packets; +import java.net.InetAddress; +import java.net.UnknownHostException; + public class PacketUtil { public static byte[]encode(long value){ @@ -80,4 +83,30 @@ return result; } + /** + * encodes the specified address into 128 bit + * @param address - inet address + */ + public static byte[] encode(InetAddress address){ + byte[]res=new byte[16]; + byte[]add=address.getAddress(); + System.arraycopy(add, 0, res, 0, add.length); + return res; + } + + public static InetAddress decodeInetAddress(byte[]data, int start, boolean ipV6)throws UnknownHostException{ + InetAddress result=null; + byte[] add=ipV6?new byte[16]:new byte[4]; + System.arraycopy(data, start, add, 0, add.length); + result=InetAddress.getByAddress(add); + return result; + } + + public static void print(byte[]arr){ + System.out.print("["); + for(byte b: arr){ + System.out.print(" "+(b&0xFF)); + } + System.out.println(" ]"); + } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2012-05-25 08:03:03 UTC (rev 74) @@ -54,7 +54,9 @@ try{ long seq=data.getSequenceNumber(); //if already have this chunk, discard it - if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true; + if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0){ + return true; + } //else compute insert position int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq); int insert=offset% size; Modified: udt-java/trunk/src/main/java/udt/util/SequenceNumber.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/main/java/udt/util/SequenceNumber.java 2012-05-25 08:03:03 UTC (rev 74) @@ -13,7 +13,7 @@ private final static long maxSequenceNo=0x7FFFFFFF; - + private final static Random rand=new Random(); /** * compare seq1 and seq2. Returns zero, if they are equal, a negative value if seq1 is smaller than * seq2, and a positive value if seq1 is larger than seq2. @@ -67,7 +67,7 @@ * generates a random number between 1 and 0x3FFFFFFF (inclusive) */ public static long random(){ - return 1+new Random().nextInt(maxOffset); + return 1+rand.nextInt(maxOffset); } } Modified: udt-java/trunk/src/test/java/echo/EchoServer.java =================================================================== --- udt-java/trunk/src/test/java/echo/EchoServer.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/echo/EchoServer.java 2012-05-25 08:03:03 UTC (rev 74) @@ -78,7 +78,7 @@ String line=readLine(in); if(line!=null){ System.out.println("ECHO: "+line); - //else echo back the line + //echo back the line writer.println(line); writer.flush(); } Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-05-25 08:03:03 UTC (rev 74) @@ -101,7 +101,6 @@ try{ for(int i=0;i<blocks.length;i++){ while(!is.haveNewData(i+1, blocks[i])){ - Thread.yield(); Thread.sleep(100); } } Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-03-28 06:34:15 UTC (rev 73) +++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-05-25 08:03:03 UTC (rev 74) @@ -3,17 +3,20 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import org.junit.Test; import udt.UDTPacket; +import udt.util.SequenceNumber; public class TestPacketFactory { @Test - public void testData(){ + public void testData()throws IOException{ String test="sdjfsdjfldskjflds"; byte[]data=test.getBytes(); @@ -26,7 +29,7 @@ } @Test - public void testConnectionHandshake(){ + public void testConnectionHandshake()throws IOException{ ConnectionHandshake p1 = new ConnectionHandshake(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); @@ -39,8 +42,9 @@ p1.setMaxFlowWndSize(128); p1.setSocketID(1); p1.setUdtVersion(4); - - + p1.setAddress(InetAddress.getLocalHost()); + p1.setCookie(SequenceNumber.random()); + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); @@ -50,7 +54,7 @@ } @Test - public void testAcknowledgement(){ + public void testAcknowledgement()throws IOException{ Acknowledgement p1 = new Acknowledgement(); p1.setAckSequenceNumber(1234); p1.setMessageNumber(9876); @@ -70,7 +74,7 @@ } @Test - public void testAcknowledgementOfAcknowledgement(){ + public void testAcknowledgementOfAcknowledgement()throws IOException{ Acknowledgment2 p1 = new Acknowledgment2(); p1.setAckSequenceNumber(1230); p1.setMessageNumber(9871); @@ -86,7 +90,7 @@ } @Test - public void testNegativeAcknowledgement(){ + public void testNegativeAcknowledgement()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -105,7 +109,7 @@ } @Test - public void testNegativeAcknowledgement2(){ + public void testNegativeAcknowledgement2()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -130,7 +134,7 @@ } @Test - public void testNegativeAcknowledgement3(){ + public void testNegativeAcknowledgement3()throws IOException{ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); p1.setTimeStamp(3452); @@ -148,13 +152,12 @@ } @Test - public void testShutdown(){ + public void testShutdown()throws IOException{ Shutdown p1 = new Shutdown(); p1.setMessageNumber(9874); p1.setTimeStamp(3453); p1.setDestinationID(3); - byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); @@ -164,7 +167,7 @@ @Test - public void testMessageDropRequest(){ + public void testMessageDropRequest()throws Exception{ MessageDropRequest p1=new MessageDropRequest(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); @@ -181,5 +184,15 @@ MessageDropRequest p2=(MessageDropRequest)p; assertEquals(p1,p2); } + + @Test + public void testPacketUtil()throws Exception{ + InetAddress i=InetAddress.getLocalHost(); + byte[]enc=PacketUtil.encode(i); + PacketUtil.print(enc); + InetAddress i2=PacketUtil.decodeInetAddress(enc, 0, false); + System.out.println(i2); + assertEquals(i, i2); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-03-28 06:34:22
|
Revision: 73 http://udt-java.svn.sourceforge.net/udt-java/?rev=73&view=rev Author: bschuller Date: 2012-03-28 06:34:15 +0000 (Wed, 28 Mar 2012) Log Message: ----------- avoid socket closed exception Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTReceiver.java Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-02-06 10:06:38 UTC (rev 72) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2012-03-28 06:34:15 UTC (rev 73) @@ -334,7 +334,7 @@ * process EXP event (see spec. p 13) */ protected void processEXPEvent()throws IOException{ - if(session.getSocket()==null)return; + if(session.getSocket()==null || !session.getSocket().isActive())return; UDTSender sender=session.getSocket().getSender(); //put all the unacknowledged packets in the senders loss list sender.putUnacknowledgedPacketsIntoLossList(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-02-06 10:06:48
|
Revision: 72 http://udt-java.svn.sourceforge.net/udt-java/?rev=72&view=rev Author: bschuller Date: 2012-02-06 10:06:38 +0000 (Mon, 06 Feb 2012) Log Message: ----------- keep track of data array length Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2012-02-06 08:26:01 UTC (rev 71) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2012-02-06 10:06:38 UTC (rev 72) @@ -200,7 +200,11 @@ throughput.end(); throughput.begin(); } - sendBuffer.put(p.getPacketSequenceNumber(), p.getData()); + //store data for potential retransmit + int l=p.getLength(); + byte[]data=new byte[l]; + System.arraycopy(p.getData(), 0, data, 0, l); + sendBuffer.put(p.getPacketSequenceNumber(), data); unacknowledged.incrementAndGet(); } statistics.incNumberOfSentDataPackets(); Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2012-02-06 08:26:01 UTC (rev 71) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2012-02-06 10:06:38 UTC (rev 72) @@ -78,7 +78,7 @@ return this.data; } - public double getLength(){ + public int getLength(){ return dataLength; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-02-06 08:26:10
|
Revision: 71 http://udt-java.svn.sourceforge.net/udt-java/?rev=71&view=rev Author: bschuller Date: 2012-02-06 08:26:01 +0000 (Mon, 06 Feb 2012) Log Message: ----------- use system-specific line terminator char Modified Paths: -------------- udt-java/trunk/src/main/java/udt/util/Util.java Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2012-02-03 09:09:37 UTC (rev 70) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2012-02-06 08:26:01 UTC (rev 71) @@ -86,7 +86,8 @@ * @throws IOException */ public static String readLine(InputStream input)throws IOException{ - return readLine(input, '\n'); + char term=System.getProperty("line.separator").charAt(0); + return readLine(input, term); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-02-03 09:09:43
|
Revision: 70 http://udt-java.svn.sourceforge.net/udt-java/?rev=70&view=rev Author: bschuller Date: 2012-02-03 09:09:37 +0000 (Fri, 03 Feb 2012) Log Message: ----------- roll back change to FlowWindow; add some comments to make it easier to understand Modified Paths: -------------- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2012-02-03 09:09:37 UTC (rev 70) @@ -6,10 +6,12 @@ /** * - * holds a fixed number of {@link DataPacket} instances which are sent out. + * Holds a fixed number of {@link DataPacket} instances which are sent out.<br/> * - * it is assumed that a single thread stores new data, and another single thread - * reads/removes data + * it is assumed that a single thread (the producer) stores new data, + * and another single thread (the consumer) reads/removes data.<br/> + * + * * * @author schuller */ @@ -23,12 +25,15 @@ private volatile boolean isFull=false; + //valid entries that can be read private volatile int validEntries=0; private volatile boolean isCheckout=false; + //index where the next data packet will be written to private volatile int writePos=0; + //one before the index where the next data packet will be read from private volatile int readPos=-1; private volatile int consumed=0; @@ -42,7 +47,7 @@ * @param chunksize - data chunk size */ public FlowWindow(int size, int chunksize){ - this.length=size; + this.length=size+1; packets=new DataPacket[length]; for(int i=0;i<packets.length;i++){ packets[i]=new DataPacket(); @@ -64,21 +69,25 @@ } if(isCheckout)throw new IllegalStateException(); isCheckout=true; - DataPacket p=packets[writePos]; - return p; + return packets[writePos]; }finally{ lock.unlock(); } } + /** + * notify the flow window that the data packet obtained by {@link #getForProducer()} + * has been filled with data and is ready for sending out + */ public void produce(){ lock.lock(); try{ + if(!isCheckout)throw new IllegalStateException(); isCheckout=false; writePos++; if(writePos==length)writePos=0; validEntries++; - isFull=validEntries==length; + isFull=validEntries==length-1; isEmpty=false; produced++; }finally{ @@ -88,11 +97,11 @@ public DataPacket consumeData(){ - if(isEmpty){ - return null; - } lock.lock(); try{ + if(isEmpty){ + return null; + } readPos++; DataPacket p=packets[readPos]; if(readPos==length-1)readPos=-1; @@ -133,6 +142,7 @@ StringBuilder sb=new StringBuilder(); sb.append("FlowWindow size=").append(length); sb.append(" full=").append(isFull).append(" empty=").append(isEmpty); + sb.append(" readPos=").append(readPos).append(" writePos=").append(writePos); sb.append(" consumed=").append(consumed).append(" produced=").append(produced); return sb.toString(); } Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 09:09:37 UTC (rev 70) @@ -25,9 +25,9 @@ @Test public void testWithoutLoss()throws Exception{ - Logger.getLogger("udt").setLevel(Level.WARNING); + Logger.getLogger("udt").setLevel(Level.INFO); UDTReceiver.dropRate=0; - num_packets=640; + num_packets=1000; TIMEOUT=Integer.MAX_VALUE; doTest(); } @@ -37,9 +37,9 @@ public void testWithLoss()throws Exception{ UDTReceiver.dropRate=3; TIMEOUT=Integer.MAX_VALUE; - num_packets=512; + num_packets=100; //set log level - Logger.getLogger("udt").setLevel(Level.WARNING); + Logger.getLogger("udt").setLevel(Level.INFO); doTest(); } @@ -48,9 +48,9 @@ public void testLargeDataSet()throws Exception{ UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - num_packets=3*1024; + num_packets=100; //set log level - Logger.getLogger("udt").setLevel(Level.WARNING); + Logger.getLogger("udt").setLevel(Level.INFO); doTest(); } Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 09:09:37 UTC (rev 70) @@ -6,6 +6,7 @@ import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -18,7 +19,7 @@ */ public class UDPTest { - final int num_packets=10*10*1000; + final int num_packets=1000; final int packetSize=UDPEndPoint.DATAGRAM_SIZE; @Test @@ -48,6 +49,7 @@ dgSendInterval.end(); dgSendTime.begin(); s.send(dp); + Thread.sleep(5); dgSendTime.end(); dgSendInterval.begin(); } @@ -76,11 +78,10 @@ public void run(){ try{ byte[]buf=new byte[packetSize]; - DatagramPacket dp=new DatagramPacket(buf,buf.length); while(true){ + DatagramPacket dp=new DatagramPacket(buf,buf.length); serverSocket.receive(dp); handoff.offer(dp); - total+=dp.getLength(); } } catch(Exception e){ @@ -91,6 +92,7 @@ }; Thread t=new Thread(serverProcess); t.start(); + System.out.println("Server started."); } private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>(); @@ -99,11 +101,15 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ + int counter=0; long start=System.currentTimeMillis(); - while(true){ - DatagramPacket dp=handoff.poll(); - if(dp!=null)total+=dp.getLength(); - if(total==N)break; + while(counter<num_packets){ + DatagramPacket dp=handoff.poll(10, TimeUnit.MILLISECONDS); + if(dp!=null){ + total+=dp.getLength(); + counter++; + System.out.println("Count: "+counter); + } } long end=System.currentTimeMillis(); System.out.println("Server time: "+(end-start)+" ms."); @@ -117,6 +123,7 @@ }; Thread t=new Thread(serverProcess); t.start(); + System.out.println("Hand-off thread started."); } Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 09:09:37 UTC (rev 70) @@ -33,6 +33,7 @@ DataPacket no=fw.getForProducer(); assertNull("Window should be full",no); + assertTrue(fw.isFull()); DataPacket c1=fw.consumeData(); //must be p1 @@ -73,14 +74,21 @@ DataPacket p4=fw.getForProducer(); assertNotNull(p4); fw.produce(); + fw.consumeData(); + + DataPacket p5=fw.getForProducer(); + assertNotNull(p5); + fw.produce(); + //which is again p1 - assertTrue(p4==p1); + assertTrue(p5==p1); } private volatile boolean fail=false; - public void testConcurrentReadWrite()throws InterruptedException{ + @Test + public void testConcurrentReadWrite_20()throws InterruptedException{ final FlowWindow fw=new FlowWindow(20, 64); Thread reader=new Thread(new Runnable(){ public void run(){ @@ -107,6 +115,34 @@ } + @Test + public void testConcurrentReadWrite_2()throws InterruptedException{ + final FlowWindow fw=new FlowWindow(2, 64); + Thread reader=new Thread(new Runnable(){ + public void run(){ + doRead(fw); + } + }); + reader.setName("reader"); + Thread writer=new Thread(new Runnable(){ + public void run(){ + doWrite(fw); + } + }); + writer.setName("writer"); + + writer.start(); + reader.start(); + + int c=0; + while(read && write && c<10){ + Thread.sleep(1000); + c++; + } + assertFalse("An error occured in reader or writer",fail); + + } + volatile boolean read=true; volatile boolean write=true; int N=100000; @@ -123,6 +159,7 @@ } }catch(Throwable ex){ ex.printStackTrace(); + System.out.println(fw); fail=true; } System.out.println("Exiting reader..."); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2012-02-03 06:57:07
|
Revision: 69 http://udt-java.svn.sourceforge.net/udt-java/?rev=69&view=rev Author: bschuller Date: 2012-02-03 06:57:00 +0000 (Fri, 03 Feb 2012) Log Message: ----------- use Junit4 Modified Paths: -------------- udt-java/trunk/LICENSE udt-java/trunk/pom.xml udt-java/trunk/src/test/java/echo/TestEchoServer.java udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java udt-java/trunk/src/test/java/udt/TestList.java udt-java/trunk/src/test/java/udt/TestReceiverLossList.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/trunk/src/test/java/udt/TestUDTInputStream.java udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java udt-java/trunk/src/test/java/udt/UDTTestBase.java udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java udt-java/trunk/src/test/java/udt/performance/TCPTest.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/LICENSE =================================================================== --- udt-java/trunk/LICENSE 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/LICENSE 2012-02-03 06:57:00 UTC (rev 69) @@ -1,5 +1,5 @@ /********************************************************************************* - * Copyright (c) 2010 Forschungszentrum Juelich GmbH + * Copyright (c) 2010-2012 Forschungszentrum Juelich GmbH * All rights reserved. * * Redistribution and use in source and binary forms, with or without Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/pom.xml 2012-02-03 06:57:00 UTC (rev 69) @@ -27,7 +27,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>3.8.1</version> + <version>4.8.2</version> <scope>test</scope> </dependency> </dependencies> Modified: udt-java/trunk/src/test/java/echo/TestEchoServer.java =================================================================== --- udt-java/trunk/src/test/java/echo/TestEchoServer.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/echo/TestEchoServer.java 2012-02-03 06:57:00 UTC (rev 69) @@ -4,12 +4,16 @@ import java.io.PrintWriter; import java.net.InetAddress; -import junit.framework.TestCase; +import junit.framework.Assert; + +import org.junit.Test; + import udt.UDTClient; import udt.util.Util; -public class TestEchoServer extends TestCase { +public class TestEchoServer { + @Test public void test1()throws Exception{ EchoServer es=new EchoServer(65321); es.start(); @@ -22,9 +26,9 @@ System.out.println("Message sent."); client.getInputStream().setBlocking(false); String line=Util.readLine(client.getInputStream()); - assertNotNull(line); + Assert.assertNotNull(line); System.out.println(line); - assertEquals("test",line); + Assert.assertEquals("test",line); } } Modified: udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java =================================================================== --- udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/echo/TestEchoServerMultiClient.java 2012-02-03 06:57:00 UTC (rev 69) @@ -4,12 +4,16 @@ import java.io.PrintWriter; import java.net.InetAddress; -import junit.framework.TestCase; +import junit.framework.Assert; + +import org.junit.Test; + import udt.UDTClient; import udt.util.Util; -public class TestEchoServerMultiClient extends TestCase { +public class TestEchoServerMultiClient { + @Test public void testTwoClients()throws Exception{ EchoServer es=new EchoServer(65321); es.start(); @@ -33,8 +37,8 @@ System.out.println("Message sent."); client.getInputStream().setBlocking(false); String line=Util.readLine(client.getInputStream()); - assertNotNull(line); + Assert.assertNotNull(line); System.out.println(line); - assertEquals("test",line); + Assert.assertEquals("test",line); } } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestList.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,8 +1,13 @@ package udt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; -import junit.framework.TestCase; +import org.junit.Test; + import udt.packets.DataPacket; import udt.packets.KeepAlive; import udt.receiver.AckHistoryEntry; @@ -15,8 +20,9 @@ /* * tests for the various list and queue classes */ -public class TestList extends TestCase{ +public class TestList { + @Test public void testCircularArray(){ CircularArray<Integer>c=new CircularArray<Integer>(5); for(int i=0;i<5;i++)c.add(i); @@ -50,14 +56,14 @@ for(int i=0;i<values.length;i++){ p.add(values[i]); } - assertEquals(4.0d, p.computeMedianTimeInterval()); + assertEquals(4.0d, p.computeMedianTimeInterval(), 0.001d); long[] arrivaltimes = {12, 12, 12, 12}; PacketPairWindow p1=new PacketPairWindow(16); for(int i=0;i<values.length;i++){ p1.add(arrivaltimes[i]); } - assertEquals(12.0d, p1.computeMedianTimeInterval()); + assertEquals(12.0d, p1.computeMedianTimeInterval(), 0.001d); } Modified: udt-java/trunk/src/test/java/udt/TestReceiverLossList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestReceiverLossList.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestReceiverLossList.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,11 +1,15 @@ package udt; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + import udt.receiver.ReceiverLossList; import udt.receiver.ReceiverLossListEntry; -import junit.framework.TestCase; -public class TestReceiverLossList extends TestCase { +public class TestReceiverLossList { + @Test public void test1(){ ReceiverLossList l=new ReceiverLossList(); ReceiverLossListEntry e1=new ReceiverLossListEntry(1); Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,8 +1,12 @@ package udt; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.FileInputStream; +import org.junit.Test; + import udt.util.ReceiveFile; import udt.util.SendFile; import udt.util.UDTThreadFactory; @@ -11,6 +15,7 @@ volatile boolean serverStarted=false; + @Test public void test1()throws Exception{ runServer(); do{ Modified: udt-java/trunk/src/test/java/udt/TestUDTInputStream.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestUDTInputStream.java 2012-02-03 06:57:00 UTC (rev 69) @@ -6,10 +6,17 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import junit.framework.Assert; + +import org.junit.Test; + import udt.util.Util; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestUDTInputStream extends UDTTestBase{ + @Test public void test1()throws Exception{ UDTInputStream is=new UDTInputStream(null); byte[] data1="this is ".getBytes(); @@ -25,6 +32,7 @@ assertEquals(digest,readMD5); } + @Test public void test2()throws Exception{ UDTInputStream is=new UDTInputStream(null); byte[] data1=getRandomData(65537); @@ -40,6 +48,7 @@ assertEquals(digest,readMD5); } + @Test public void testInOrder()throws Exception{ UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); @@ -57,6 +66,7 @@ assertEquals(digest,readMD5); } + @Test public void testRandomOrder()throws Exception{ UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); @@ -76,7 +86,7 @@ } - + @Test public void testLargeDataSetTwoThreads()throws Exception{ final UDTInputStream is=new UDTInputStream(null); is.setBlocking(false); @@ -98,7 +108,7 @@ is.noMoreData(); }catch(Exception e){ e.printStackTrace(); - fail(); + Assert.fail(); } } }; Modified: udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestUDTServerSocket.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,11 +1,16 @@ package udt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import java.net.InetAddress; import java.security.MessageDigest; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.util.Util; public class TestUDTServerSocket extends UDTTestBase{ @@ -18,6 +23,7 @@ int TIMEOUT=20000; + @Test public void testWithoutLoss()throws Exception{ Logger.getLogger("udt").setLevel(Level.WARNING); UDTReceiver.dropRate=0; @@ -27,6 +33,7 @@ } //set an artificial loss rate + @Test public void testWithLoss()throws Exception{ UDTReceiver.dropRate=3; TIMEOUT=Integer.MAX_VALUE; @@ -37,6 +44,7 @@ } //send even more data + @Test public void testLargeDataSet()throws Exception{ UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; @@ -97,6 +105,7 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ + System.out.println("Starting server."); long start=System.currentTimeMillis(); UDTSocket s=serverSocket.accept(); assertNotNull(s); @@ -121,7 +130,6 @@ } catch(Exception e){ e.printStackTrace(); - fail(); serverRunning=false; } } Modified: udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/TestUdpEndpoint.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,14 +1,20 @@ package udt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.net.DatagramPacket; import java.net.InetAddress; import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.packets.Destination; public class TestUdpEndpoint extends UDTTestBase{ + @Test public void testClientServerMode()throws Exception{ //select log level @@ -41,6 +47,7 @@ * just check how fast we can send out UDP packets from the endpoint * @throws Exception */ + @Test public void testRawSendRate()throws Exception{ Logger.getLogger("udt").setLevel(Level.WARNING); System.out.println("Checking raw UDP send rate..."); @@ -65,11 +72,12 @@ Thread.sleep(1000); } - //no rendezvous yet... - public void x_testRendezvousConnect()throws Exception{ + //@Test() + public void testRendezvousConnect()throws Exception{ } + @Test public void testBindToAnyPort()throws Exception{ UDPEndPoint ep=new UDPEndPoint(InetAddress.getByName("localhost")); int port=ep.getLocalPort(); Modified: udt-java/trunk/src/test/java/udt/UDTTestBase.java =================================================================== --- udt-java/trunk/src/test/java/udt/UDTTestBase.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/UDTTestBase.java 2012-02-03 06:57:00 UTC (rev 69) @@ -7,12 +7,10 @@ import udt.util.Util; -import junit.framework.TestCase; - /** * some additional utilities useful for testing */ -public abstract class UDTTestBase extends TestCase{ +public abstract class UDTTestBase { //get an array filled with random data protected byte[] getRandomData(int size){ Modified: udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/packets/TestControlPacketType.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,18 +1,22 @@ package udt.packets; -import udt.packets.ControlPacket; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + import udt.packets.ControlPacket.ControlPacketType; -import junit.framework.TestCase; -public class TestControlPacketType extends TestCase { +public class TestControlPacketType { + @Test public void testSequenceNumber1(){ ControlPacket p=new DummyControlPacket(); byte[]x=p.getHeader(); byte highest=x[0]; assertEquals(128, highest & 0x80); } - + + @Test public void testControlPacketTypes(){ ControlPacketType t=ControlPacketType.CONNECTION_HANDSHAKE; assertEquals(0,t.ordinal()); Modified: udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/packets/TestDataPacket.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,10 +1,11 @@ package udt.packets; -import junit.framework.TestCase; -import udt.packets.DataPacket; +import static org.junit.Assert.assertEquals; -public class TestDataPacket extends TestCase { +import org.junit.Test; +public class TestDataPacket { + @Test public void testSequenceNumber1(){ DataPacket p=new DataPacket(); p.setPacketSequenceNumber(1); @@ -17,6 +18,7 @@ assertEquals(1, lowest); } + @Test public void testEncoded(){ DataPacket p=new DataPacket(); p.setPacketSequenceNumber(1); @@ -30,9 +32,9 @@ System.out.println("String s = " + s); } - + + @Test public void testDecode1(){ - DataPacket testPacket1=new DataPacket(); testPacket1.setPacketSequenceNumber(127); testPacket1.setDestinationID(1); @@ -74,6 +76,7 @@ } + @Test public void testEncodeDecode1(){ DataPacket dp=new DataPacket(); dp.setPacketSequenceNumber(127); Modified: udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java =================================================================== --- udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/packets/TestPacketFactory.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,24 +1,21 @@ package udt.packets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.util.ArrayList; import java.util.List; -import junit.framework.TestCase; +import org.junit.Test; + import udt.UDTPacket; -import udt.packets.Acknowledgement; -import udt.packets.Acknowledgment2; -import udt.packets.ConnectionHandshake; -import udt.packets.DataPacket; -import udt.packets.MessageDropRequest; -import udt.packets.NegativeAcknowledgement; -import udt.packets.PacketFactory; -import udt.packets.Shutdown; -public class TestPacketFactory extends TestCase { +public class TestPacketFactory { + @Test public void testData(){ String test="sdjfsdjfldskjflds"; - + byte[]data=test.getBytes(); data[0]=(byte)(data[0] & 0x7f); UDTPacket p=PacketFactory.createPacket(data); @@ -27,14 +24,14 @@ assertTrue(p instanceof DataPacket); assertEquals(test,t); } - - + + @Test public void testConnectionHandshake(){ ConnectionHandshake p1 = new ConnectionHandshake(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); p1.setDestinationID(1); - + p1.setConnectionType(1); p1.setSocketType(1); p1.setInitialSeqNo(321); @@ -42,16 +39,17 @@ p1.setMaxFlowWndSize(128); p1.setSocketID(1); p1.setUdtVersion(4); - - + + byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); ConnectionHandshake p2=(ConnectionHandshake)p; assertEquals(p1,p2); - + } - + + @Test public void testAcknowledgement(){ Acknowledgement p1 = new Acknowledgement(); p1.setAckSequenceNumber(1234); @@ -64,28 +62,30 @@ p1.setPacketReceiveRate(1000); p1.setRoundTripTime(1000); p1.setRoundTripTimeVar(500); - + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); Acknowledgement p2=(Acknowledgement)p; assertEquals(p1,p2); } - + + @Test public void testAcknowledgementOfAcknowledgement(){ Acknowledgment2 p1 = new Acknowledgment2(); p1.setAckSequenceNumber(1230); p1.setMessageNumber(9871); p1.setTimeStamp(3451); p1.setDestinationID(1); - + byte[]p1_data=p1.getEncoded(); UDTPacket p=PacketFactory.createPacket(p1_data); Acknowledgment2 p2=(Acknowledgment2)p; assertEquals(p1,p2); - - + + } - + + @Test public void testNegativeAcknowledgement(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); @@ -95,15 +95,16 @@ p1.addLossInfo(6); p1.addLossInfo(7, 10); byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); NegativeAcknowledgement p2=(NegativeAcknowledgement)p; assertEquals(p1,p2); - + assertEquals((Integer)5, (Integer)p2.getDecodedLossInfo().get(0)); assertEquals(6, p2.getDecodedLossInfo().size()); } - + + @Test public void testNegativeAcknowledgement2(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); @@ -116,18 +117,19 @@ loss.add(8l); loss.add(9l); loss.add(11l); - + p1.addLossInfo(loss); byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); NegativeAcknowledgement p2=(NegativeAcknowledgement)p; assertEquals(p1,p2); - + assertEquals((Integer)5, (Integer)p2.getDecodedLossInfo().get(0)); assertEquals(6, p2.getDecodedLossInfo().size()); } + @Test public void testNegativeAcknowledgement3(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); p1.setMessageNumber(9872); @@ -137,42 +139,43 @@ p1.addLossInfo(6); p1.addLossInfo(147, 226); byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); NegativeAcknowledgement p2=(NegativeAcknowledgement)p; assertEquals(p1,p2); - - + + } - - public void testShutdown(){ + + @Test + public void testShutdown(){ Shutdown p1 = new Shutdown(); p1.setMessageNumber(9874); p1.setTimeStamp(3453); p1.setDestinationID(3); - - + + byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); Shutdown p2=(Shutdown)p; assertEquals(p1,p2); } - - - + + + @Test public void testMessageDropRequest(){ MessageDropRequest p1=new MessageDropRequest(); p1.setMessageNumber(9876); p1.setTimeStamp(3456); p1.setDestinationID(4); - + p1.setMsgFirstSeqNo(2); p1.setMsgLastSeqNo(3); - - + + byte[]p1_data=p1.getEncoded(); - + UDTPacket p=PacketFactory.createPacket(p1_data); assertTrue(p instanceof MessageDropRequest); MessageDropRequest p2=(MessageDropRequest)p; Modified: udt-java/trunk/src/test/java/udt/performance/TCPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TCPTest.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/TCPTest.java 2012-02-03 06:57:00 UTC (rev 69) @@ -6,17 +6,18 @@ import java.net.Socket; import java.util.Random; -import junit.framework.TestCase; +import org.junit.Test; /** * send some data over a TCP connection and measure performance * */ -public class TCPTest extends TestCase { +public class TCPTest { int BUFSIZE=1024; int num_packets=10*1000; + @Test public void test1()throws Exception{ runServer(); //client socket @@ -59,7 +60,7 @@ } catch(Exception e){ e.printStackTrace(); - fail(); + serverRunning=false; } } }; Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,5 +1,7 @@ package udt.performance; +import static org.junit.Assert.*; + import java.io.File; import java.net.InetAddress; import java.security.MessageDigest; @@ -9,6 +11,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.UDTClient; import udt.UDTInputStream; import udt.UDTReceiver; @@ -31,6 +35,7 @@ int READ_BUFFERSIZE=1*1024*1024; + @Test public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); // System.setProperty("udt.receiver.storeStatistics","true"); Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeDataCC1.java 2012-02-03 06:57:00 UTC (rev 69) @@ -3,6 +3,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.junit.Test; + import udt.UDTReceiver; import udt.UDTSession; import udt.cc.SimpleTCP; @@ -22,6 +24,8 @@ int READ_BUFFERSIZE=1*1024*1024; + @Override + @Test public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); UDTReceiver.dropRate=0; Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2012-02-03 06:57:00 UTC (rev 69) @@ -7,7 +7,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; -import junit.framework.TestCase; +import org.junit.Test; + import udt.UDPEndPoint; import udt.packets.DataPacket; import udt.util.MeanValue; @@ -15,11 +16,12 @@ /** * send some data over a UDP connection and measure performance */ -public class UDPTest extends TestCase { +public class UDPTest { final int num_packets=10*10*1000; final int packetSize=UDPEndPoint.DATAGRAM_SIZE; + @Test public void test1()throws Exception{ runServer(); runThirdThread(); Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 19:52:51 UTC (rev 68) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2012-02-03 06:57:00 UTC (rev 69) @@ -1,12 +1,20 @@ package udt.sender; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.TimeoutException; -import junit.framework.TestCase; +import org.junit.Test; + import udt.packets.DataPacket; -public class TestFlowWindow extends TestCase { +public class TestFlowWindow { + @Test public void testFillWindow()throws InterruptedException, TimeoutException{ FlowWindow fw=new FlowWindow(3, 128); DataPacket p1=fw.getForProducer(); @@ -38,6 +46,7 @@ assertTrue(fw.isEmpty()); } + @Test public void testOverflow()throws InterruptedException, TimeoutException{ FlowWindow fw=new FlowWindow(3, 64); DataPacket p1=fw.getForProducer(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-12-01 19:52:58
|
Revision: 68 http://udt-java.svn.sourceforge.net/udt-java/?rev=68&view=rev Author: bschuller Date: 2011-12-01 19:52:51 +0000 (Thu, 01 Dec 2011) Log Message: ----------- apply a few fixes from P. Elgee Modified Paths: -------------- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-12-01 13:55:08 UTC (rev 67) +++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-12-01 19:52:51 UTC (rev 68) @@ -78,7 +78,7 @@ writePos++; if(writePos==length)writePos=0; validEntries++; - isFull=validEntries==length-1; + isFull=validEntries==length; isEmpty=false; produced++; }finally{ Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-12-01 13:55:08 UTC (rev 67) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-12-01 19:52:51 UTC (rev 68) @@ -120,7 +120,6 @@ } else return null; } - numValidChunks.decrementAndGet(); return r; } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 13:55:08 UTC (rev 67) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-12-01 19:52:51 UTC (rev 68) @@ -22,7 +22,7 @@ boolean running=false; //how many - int num_packets=500; + int num_packets=50; //how large is a single packet int size=20*1024*1024; Modified: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 13:55:08 UTC (rev 67) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-12-01 19:52:51 UTC (rev 68) @@ -110,9 +110,7 @@ while( (p=fw.consumeData())==null){ Thread.sleep(1); } - synchronized (p) { - assertEquals(i,p.getMessageNumber()); - } + assertEquals(i,p.getMessageNumber()); } }catch(Throwable ex){ ex.printStackTrace(); @@ -131,11 +129,9 @@ do{ p=fw.getForProducer(); if(p!=null){ - synchronized(p){ - p.setData(("test"+i).getBytes()); - p.setMessageNumber(i); - fw.produce(); - } + p.setData(("test"+i).getBytes()); + p.setMessageNumber(i); + fw.produce(); } }while(p==null); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-12-01 13:55:19
|
Revision: 67 http://udt-java.svn.sourceforge.net/udt-java/?rev=67&view=rev Author: bschuller Date: 2011-12-01 13:55:08 +0000 (Thu, 01 Dec 2011) Log Message: ----------- fix test failure Modified Paths: -------------- udt-java/trunk/src/main/java/udt/packets/DataPacket.java Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-11-17 07:26:46 UTC (rev 66) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-12-01 13:55:08 UTC (rev 67) @@ -61,7 +61,6 @@ public DataPacket(byte[] encodedData, int length){ decode(encodedData,length); - dataLength=length; } void decode(byte[]encodedData,int length){ @@ -69,8 +68,9 @@ messageNumber=PacketUtil.decode(encodedData, 4); timeStamp=PacketUtil.decode(encodedData, 8); destinationID=PacketUtil.decode(encodedData, 12); - data=new byte[length-16]; - System.arraycopy(encodedData, 16, data, 0, data.length); + dataLength=length-16; + data=new byte[dataLength]; + System.arraycopy(encodedData, 16, data, 0, dataLength); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-11-17 07:26:52
|
Revision: 66 http://udt-java.svn.sourceforge.net/udt-java/?rev=66&view=rev Author: bschuller Date: 2011-11-17 07:26:46 +0000 (Thu, 17 Nov 2011) Log Message: ----------- fix single byte read() Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTInputStream.java Modified: udt-java/trunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTInputStream.java 2011-10-16 11:41:23 UTC (rev 65) +++ udt-java/trunk/src/main/java/udt/UDTInputStream.java 2011-11-17 07:26:46 UTC (rev 66) @@ -82,7 +82,7 @@ b=read(single); if(b>0){ - return single[0]; + return single[0] & 0xFF; } else { return b; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-10-16 11:41:29
|
Revision: 65 http://udt-java.svn.sourceforge.net/udt-java/?rev=65&view=rev Author: bschuller Date: 2011-10-16 11:41:23 +0000 (Sun, 16 Oct 2011) Log Message: ----------- remove Thread.sleep call, and reduce waiting interval while waiting for session to become ready Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTClient.java Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-16 17:56:07 UTC (rev 64) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-10-16 11:41:23 UTC (rev 65) @@ -85,10 +85,9 @@ clientSession.connect(); //wait for handshake while(!clientSession.isReady()){ - Thread.sleep(500); + Thread.sleep(5); } logger.info("The UDTClient is connected"); - Thread.sleep(500); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-08-16 17:56:14
|
Revision: 64 http://udt-java.svn.sourceforge.net/udt-java/?rev=64&view=rev Author: bschuller Date: 2011-08-16 17:56:07 +0000 (Tue, 16 Aug 2011) Log Message: ----------- fix two bugs: thanks to ajsenf (Alexander Senf) see https://sourceforge.net/projects/udt-java/forums/forum/1109269/topic/4615162?message=10597365 Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-08-16 17:56:07 UTC (rev 64) @@ -36,6 +36,7 @@ import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -115,15 +116,15 @@ } /** - * flush outstanding data (and make sure it is acknowledged) + * flush outstanding data, with the specified maximum waiting time + * @param timeOut - timeout in millis (if smaller than 0, no timeout is used) * @throws IOException * @throws InterruptedException */ - public void flush()throws IOException, InterruptedException{ + public void flush()throws IOException, InterruptedException, TimeoutException{ clientSession.getSocket().flush(); } - public void shutdown()throws IOException{ if (clientSession.isReady()&& clientSession.active==true) Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-08-16 17:56:07 UTC (rev 64) @@ -39,7 +39,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -110,11 +111,9 @@ private volatile CountDownLatch startLatch=new CountDownLatch(1); //used by the sender to wait for an ACK - private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final ReentrantLock ackLock=new ReentrantLock(); + private final Condition ackCondition=ackLock.newCondition(); - //used by the sender to wait for an ACK of a certain sequence number - private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); - private final boolean storeStatistics; private final int chunksize; @@ -130,8 +129,6 @@ flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize); lastAckSequenceNumber=session.getInitialSequenceNumber(); currentSequenceNumber=session.getInitialSequenceNumber()-1; - waitForAckLatch.set(new CountDownLatch(1)); - waitForSeqAckLatch.set(new CountDownLatch(1)); storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics"); initMetrics(); doStart(); @@ -278,8 +275,9 @@ } protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{ - waitForAckLatch.get().countDown(); - waitForSeqAckLatch.get().countDown(); + ackLock.lock(); + ackCondition.signal(); + ackLock.unlock(); CongestionControl cc=session.getCongestionControl(); long rtt=acknowledgement.getRoundTripTime(); @@ -407,6 +405,8 @@ } } + private final DataPacket retransmit=new DataPacket(); + /** * re-transmit an entry from the sender loss list * @param entry @@ -416,13 +416,11 @@ //retransmit the packet and remove it from the list byte[]data=sendBuffer.get(seqNumber); if(data!=null){ - //System.out.println("re-transmit "+data); - DataPacket packet=new DataPacket(); - packet.setPacketSequenceNumber(seqNumber); - packet.setSession(session); - packet.setDestinationID(session.getDestination().getSocketID()); - packet.setData(data); - endpoint.doSend(packet); + retransmit.setPacketSequenceNumber(seqNumber); + retransmit.setSession(session); + retransmit.setDestinationID(session.getDestination().getSocketID()); + retransmit.setData(data); + endpoint.doSend(retransmit); statistics.incNumberOfRetransmittedDataPackets(); } }catch (Exception e) { @@ -486,18 +484,37 @@ */ public void waitForAck(long sequenceNumber)throws InterruptedException{ while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ - waitForSeqAckLatch.set(new CountDownLatch(1)); - waitForSeqAckLatch.get().await(10, TimeUnit.MILLISECONDS); + ackLock.lock(); + try{ + ackCondition.await(100, TimeUnit.MICROSECONDS); + }finally{ + ackLock.unlock(); + } } } + public void waitForAck(long sequenceNumber, int timeout)throws InterruptedException{ + while(!session.isShutdown() && !haveAcknowledgementFor(sequenceNumber)){ + ackLock.lock(); + try{ + ackCondition.await(timeout, TimeUnit.MILLISECONDS); + }finally{ + ackLock.unlock(); + } + } + } + /** * wait for the next acknowledge * @throws InterruptedException */ public void waitForAck()throws InterruptedException{ - waitForAckLatch.set(new CountDownLatch(1)); - waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS); + ackLock.lock(); + try{ + ackCondition.await(200, TimeUnit.MICROSECONDS); + }finally{ + ackLock.unlock(); + } } Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2011-08-16 17:56:07 UTC (rev 64) @@ -61,16 +61,16 @@ } void decode(byte[]data){ + ackSequenceNumber=PacketUtil.decode(data, 0); } public boolean forSender(){ return false; } - private static final byte[]empty=new byte[0]; @Override public byte[] encodeControlInformation(){ - return empty; + return PacketUtil.encode(ackSequenceNumber); } } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-08-16 17:56:07 UTC (rev 64) @@ -54,7 +54,7 @@ try{ long seq=data.getSequenceNumber(); //if already have this chunk, discard it - if(SequenceNumber.compare(seq, initialSequenceNumber)<0)return true; + if(SequenceNumber.compare(seq, highestReadSequenceNumber)<=0)return true; //else compute insert position int offset=(int)SequenceNumber.seqOffset(initialSequenceNumber, seq); int insert=offset% size; @@ -120,6 +120,7 @@ } else return null; } + numValidChunks.decrementAndGet(); return r; } Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-13 00:24:18 UTC (rev 63) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-08-16 17:56:07 UTC (rev 64) @@ -5,6 +5,7 @@ import java.security.MessageDigest; import java.text.NumberFormat; import java.util.Random; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,10 +22,10 @@ boolean running=false; //how many - int num_packets=100; + int num_packets=500; //how large is a single packet - int size=1*1024*1024; + int size=20*1024*1024; int TIMEOUT=Integer.MAX_VALUE; @@ -36,7 +37,12 @@ // System.setProperty("udt.sender.storeStatistics","true"); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; - doTest(); + try{ + doTest(); + }catch(TimeoutException te){ + te.printStackTrace(); + fail(); + } } private final NumberFormat format=NumberFormat.getNumberInstance(); @@ -59,6 +65,7 @@ long start=System.currentTimeMillis(); System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each"); long end=0; + if(serverRunning){ for(int i=0;i<num_packets;i++){ long block=System.currentTimeMillis(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-08-13 00:24:24
|
Revision: 63 http://udt-java.svn.sourceforge.net/udt-java/?rev=63&view=rev Author: thomasowens Date: 2011-08-13 00:24:18 +0000 (Sat, 13 Aug 2011) Log Message: ----------- Added a dependency on log4j to the POM. It's not used yet, but I hope to use it in the tests, if not also in the main code. Modified Paths: -------------- udt-java/skunk/pom.xml Modified: udt-java/skunk/pom.xml =================================================================== --- udt-java/skunk/pom.xml 2011-08-13 00:23:21 UTC (rev 62) +++ udt-java/skunk/pom.xml 2011-08-13 00:24:18 UTC (rev 63) @@ -3,7 +3,6 @@ <modelVersion>4.0.0</modelVersion> <groupId>udt-java</groupId> <artifactId>udt-java</artifactId> - <packaging>jar</packaging> <name>UDT Java implementation</name> <version>0.6-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> @@ -30,6 +29,11 @@ <version>3.8.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.16</version> + </dependency> </dependencies> <build> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-08-13 00:23:28
|
Revision: 62 http://udt-java.svn.sourceforge.net/udt-java/?rev=62&view=rev Author: thomasowens Date: 2011-08-13 00:23:21 +0000 (Sat, 13 Aug 2011) Log Message: ----------- Updated tests to reflect that UDPEndpoint was replaced by UDPMultiplexer. Modified Paths: -------------- udt-java/skunk/src/test/java/echo/EchoServer.java udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java udt-java/skunk/src/test/java/udt/performance/UDPTest.java Modified: udt-java/skunk/src/test/java/echo/EchoServer.java =================================================================== --- udt-java/skunk/src/test/java/echo/EchoServer.java 2011-08-05 06:54:24 UTC (rev 61) +++ udt-java/skunk/src/test/java/echo/EchoServer.java 2011-08-13 00:23:21 UTC (rev 62) @@ -11,81 +11,85 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import udt.UDTInputStream; -import udt.UDTOutputStream; import udt.UDTServerSocket; -import udt.UDTSocket; import udt.util.UDTThreadFactory; -public class EchoServer implements Runnable{ +public class EchoServer implements Runnable { - final ExecutorService pool=Executors.newFixedThreadPool(2); + final ExecutorService pool = Executors.newFixedThreadPool(2); final UDTServerSocket server; final Thread serverThread; - volatile boolean started=false; - volatile boolean stopped=false; + volatile boolean started = false; + volatile boolean stopped = false; - public EchoServer(int port)throws Exception{ - server=new UDTServerSocket(InetAddress.getByName("localhost"),port); - serverThread=UDTThreadFactory.get().newThread(this); + public EchoServer(int port) throws Exception { + server = new UDTServerSocket(InetAddress.getByName("localhost"), port); + serverThread = UDTThreadFactory.get().newThread(this); } - public void start(){ + public void start() { serverThread.start(); } - - public void stop(){ - stopped=true; + + public void stop() { + stopped = true; } - public void run(){ - try{ - started=true; - while(!stopped){ - final Socket socket=server.accept(); + + public void run() { + try { + started = true; + while (!stopped) { + final Socket socket = server.accept(); pool.execute(new Request(socket)); } - }catch(Exception ex){ + } catch (Exception ex) { ex.printStackTrace(); } } - static String readLine(InputStream r)throws IOException{ - ByteArrayOutputStream bos=new ByteArrayOutputStream(); - while(true){ - int c=r.read(); - if(c<0 && bos.size()==0)return null; - if(c<0 || c==10)break; - else bos.write(c); + static String readLine(InputStream r) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + while (true) { + int c = r.read(); + if (c < 0 && bos.size() == 0) + return null; + if (c < 0 || c == 10) + break; + else + bos.write(c); } return bos.toString(); } + public static class Request implements Runnable { - public static class Request implements Runnable{ - final Socket socket; - public Request(Socket socket){ - this.socket=socket; + public Request(Socket socket) { + this.socket = socket; } - public void run(){ - try{ - System.out.println("Processing request from <"+socket.getRemoteSocketAddress().toString()+">"); - InputStream in=socket.getInputStream(); - OutputStream out=socket.getOutputStream(); - PrintWriter writer=new PrintWriter(new OutputStreamWriter(out)); - String line=readLine(in); - if(line!=null){ - System.out.println("ECHO: "+line); - //else echo back the line + public void run() { + try { + System.out.println("Processing request from <" + + socket.getRemoteSocketAddress().toString() + ">"); + InputStream in = socket.getInputStream(); + OutputStream out = socket.getOutputStream(); + PrintWriter writer = new PrintWriter( + new OutputStreamWriter(out)); + String line = readLine(in); + if (line != null) { + System.out.println("ECHO: " + line); + // else echo back the line writer.println(line); writer.flush(); } - System.out.println("Request from <"+socket.getRemoteSocketAddress().toString()+"> finished."); - }catch(Exception ex){ + System.out.println("Request from <" + + socket.getRemoteSocketAddress().toString() + + "> finished."); + } catch (Exception ex) { ex.printStackTrace(); } } Modified: udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-08-05 06:54:24 UTC (rev 61) +++ udt-java/skunk/src/test/java/udt/TestSendFileReceiveFile.java 2011-08-13 00:23:21 UTC (rev 62) @@ -2,13 +2,14 @@ import java.io.File; import java.io.FileInputStream; +import java.util.logging.Logger; import udt.util.ReceiveFile; import udt.util.SendFile; import udt.util.UDTThreadFactory; public class TestSendFileReceiveFile extends UDTTestBase{ - + volatile boolean serverStarted=false; public void test1()throws Exception{ Modified: udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java =================================================================== --- udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java 2011-08-05 06:54:24 UTC (rev 61) +++ udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java 2011-08-13 00:23:21 UTC (rev 62) @@ -7,74 +7,80 @@ import udt.packets.UDTSocketAddress; -public class TestUdpEndpoint extends UDTTestBase{ +public class TestUdpEndpoint extends UDTTestBase { - public void testClientServerMode()throws Exception{ + public void testClientServerMode() throws Exception { - //select log level + // select log level Logger.getLogger("udt").setLevel(Level.INFO); - - UDPEndPoint server= UDPEndPoint.get(InetAddress.getByName("localhost"),65322); + + UDPMultiplexer server = UDPMultiplexer.get( + InetAddress.getByName("localhost"), 65322); server.start(); - UDTClient client=new UDTClient(InetAddress.getByName("localhost"),12346); + UDTClient client = new UDTClient(InetAddress.getByName("localhost"), + 12346); client.connect("localhost", 65322); - - //test a large message (resulting in multiple data packets) - int num_packets=100; - int N=num_packets*1024; - byte[]data=getRandomData(N); - + + // test a large message (resulting in multiple data packets) + int num_packets = 100; + int N = num_packets * 1024; + byte[] data = getRandomData(N); + client.sendBlocking(data); Thread.sleep(2000); System.out.println(client.getStatistics()); - System.out.println(server.getSessions().iterator().next().getStatistics()); - int sent=client.getStatistics().getNumberOfSentDataPackets(); - int received=server.getSessions().iterator().next().getStatistics().getNumberOfReceivedDataPackets(); + System.out.println(server.getSessions().iterator().next() + .getStatistics()); + int sent = client.getStatistics().getNumberOfSentDataPackets(); + int received = server.getSessions().iterator().next().getStatistics() + .getNumberOfReceivedDataPackets(); assertEquals(sent, received); - + server.stop(); Thread.sleep(2000); - } - - + } + /** * just check how fast we can send out UDP packets from the endpoint + * * @throws Exception */ - public void testRawSendRate()throws Exception{ + public void testRawSendRate() throws Exception { Logger.getLogger("udt").setLevel(Level.WARNING); System.out.println("Checking raw UDP send rate..."); - InetAddress localhost=InetAddress.getByName("localhost"); - UDPEndPoint endpoint=UDPEndPoint.get(localhost,65322); + InetAddress localhost = InetAddress.getByName("localhost"); + UDPMultiplexer endpoint = UDPMultiplexer.get(localhost, 65322); endpoint.start(); - int socketID = endpoint.getUniqueSocketID(); - UDTSocketAddress d1=new UDTSocketAddress(localhost,12345,socketID); - int dataSize=UDTSession.DEFAULT_DATAGRAM_SIZE; - DatagramPacket p=new DatagramPacket(getRandomData(dataSize),dataSize,d1.getAddress(),d1.getPort()); - int N=100000; - long start=System.currentTimeMillis(); - //send many packets as fast as we can - for(int i=0;i<N;i++){ + int socketID = endpoint.getUniqueSocketID(); + UDTSocketAddress d1 = new UDTSocketAddress(localhost, 12345, socketID); + int dataSize = UDTSession.DEFAULT_DATAGRAM_SIZE; + DatagramPacket p = new DatagramPacket(getRandomData(dataSize), + dataSize, d1.getAddress(), d1.getPort()); + int N = 100000; + long start = System.currentTimeMillis(); + // send many packets as fast as we can + for (int i = 0; i < N; i++) { endpoint.sendRaw(p); } - long end=System.currentTimeMillis(); - float rate=1000*N/(end-start); - System.out.println("PacketRate: "+(int)rate+" packets/sec."); - float dataRate=dataSize*rate/1024/1024; - System.out.println("Data Rate: "+(int)dataRate+" MBytes/sec."); + long end = System.currentTimeMillis(); + float rate = 1000 * N / (end - start); + System.out.println("PacketRate: " + (int) rate + " packets/sec."); + float dataRate = dataSize * rate / 1024 / 1024; + System.out.println("Data Rate: " + (int) dataRate + " MBytes/sec."); endpoint.stop(); Thread.sleep(1000); } - - //no rendezvous yet... - public void x_testRendezvousConnect()throws Exception{ - + + // no rendezvous yet... + public void x_testRendezvousConnect() throws Exception { + } - - public void testBindToAnyPort()throws Exception{ - UDPEndPoint ep=UDPEndPoint.get(InetAddress.getByName("localhost"),0); - int port=ep.getLocalPort(); - assertTrue(port>0); + + public void testBindToAnyPort() throws Exception { + UDPMultiplexer ep = UDPMultiplexer.get( + InetAddress.getByName("localhost"), 0); + int port = ep.getLocalPort(); + assertTrue(port > 0); } - + } Modified: udt-java/skunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/skunk/src/test/java/udt/performance/UDPTest.java 2011-08-05 06:54:24 UTC (rev 61) +++ udt-java/skunk/src/test/java/udt/performance/UDPTest.java 2011-08-13 00:23:21 UTC (rev 62) @@ -8,7 +8,7 @@ import java.util.concurrent.SynchronousQueue; import junit.framework.TestCase; -import udt.UDPEndPoint; +import udt.UDPMultiplexer; import udt.packets.DataPacket; import udt.util.MeanValue; @@ -17,30 +17,32 @@ */ public class UDPTest extends TestCase { - final int num_packets=10*10*1000; - final int packetSize=UDPEndPoint.DATAGRAM_SIZE; + final int num_packets = 10 * 10 * 1000; + final int packetSize = UDPMultiplexer.DATAGRAM_SIZE; - public void test1()throws Exception{ + public void test1() throws Exception { runServer(); runThirdThread(); - - //client socket - DatagramSocket s=new DatagramSocket(12345); - - //generate a test array with random content - N=num_packets*packetSize; - byte[]data=new byte[packetSize]; + + // client socket + DatagramSocket s = new DatagramSocket(12345); + + // generate a test array with random content + N = num_packets * packetSize; + byte[] data = new byte[packetSize]; new Random().nextBytes(data); - long start=System.currentTimeMillis(); - DatagramPacket dp=new DatagramPacket(new byte[packetSize],packetSize); + long start = System.currentTimeMillis(); + DatagramPacket dp = new DatagramPacket(new byte[packetSize], packetSize); dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); - System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); - MeanValue dgSendTime=new MeanValue("Datagram send time",false); - MeanValue dgSendInterval=new MeanValue("Datagram send interval",false); - - for(int i=0;i<num_packets;i++){ - DataPacket p=new DataPacket(); + System.out.println("Sending " + num_packets + " data blocks of <" + + packetSize + "> bytes"); + MeanValue dgSendTime = new MeanValue("Datagram send time", false); + MeanValue dgSendInterval = new MeanValue("Datagram send interval", + false); + + for (int i = 0; i < num_packets; i++) { + DataPacket p = new DataPacket(); p.setData(data); dp.setData(p.getEncoded()); dgSendInterval.end(); @@ -50,72 +52,76 @@ dgSendInterval.begin(); } System.out.println("Finished sending."); - while(serverRunning)Thread.sleep(10); + while (serverRunning) + Thread.sleep(10); System.out.println("Server stopped."); - long end=System.currentTimeMillis(); - System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); - float rate=N/1000/(end-start); - System.out.println("Rate "+rate+" Mbytes/sec "+(rate*8)+ " Mbit/sec"); - System.out.println("Rate "+num_packets+" packets/sec"); - System.out.println("Mean send time "+dgSendTime.get()); - System.out.println("Mean send interval "+dgSendInterval.get()); - System.out.println("Server received: "+total); + long end = System.currentTimeMillis(); + System.out.println("Done. Sending " + N / 1024 / 1024 + " Mbytes took " + + (end - start) + " ms"); + float rate = N / 1000 / (end - start); + System.out.println("Rate " + rate + " Mbytes/sec " + (rate * 8) + + " Mbit/sec"); + System.out.println("Rate " + num_packets + " packets/sec"); + System.out.println("Mean send time " + dgSendTime.get()); + System.out.println("Mean send interval " + dgSendInterval.get()); + System.out.println("Server received: " + total); } - int N=0; - long total=0; - volatile boolean serverRunning=true; + int N = 0; + long total = 0; + volatile boolean serverRunning = true; - private void runServer()throws Exception{ - //server socket - final DatagramSocket serverSocket=new DatagramSocket(65321); + private void runServer() throws Exception { + // server socket + final DatagramSocket serverSocket = new DatagramSocket(65321); - Runnable serverProcess=new Runnable(){ - public void run(){ - try{ - byte[]buf=new byte[packetSize]; - DatagramPacket dp=new DatagramPacket(buf,buf.length); - while(true){ + Runnable serverProcess = new Runnable() { + public void run() { + try { + byte[] buf = new byte[packetSize]; + DatagramPacket dp = new DatagramPacket(buf, buf.length); + while (true) { serverSocket.receive(dp); handoff.offer(dp); - total+=dp.getLength(); + total += dp.getLength(); } - } - catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); } - serverRunning=false; + serverRunning = false; } }; - Thread t=new Thread(serverProcess); + Thread t = new Thread(serverProcess); t.start(); } - - private final BlockingQueue<DatagramPacket> handoff=new SynchronousQueue<DatagramPacket>(); - - private void runThirdThread()throws Exception{ - Runnable serverProcess=new Runnable(){ - public void run(){ - try{ - long start=System.currentTimeMillis(); - while(true){ - DatagramPacket dp=handoff.poll(); - if(dp!=null)total+=dp.getLength(); - if(total==N)break; + + private final BlockingQueue<DatagramPacket> handoff = new SynchronousQueue<DatagramPacket>(); + + private void runThirdThread() throws Exception { + Runnable serverProcess = new Runnable() { + public void run() { + try { + long start = System.currentTimeMillis(); + while (true) { + DatagramPacket dp = handoff.poll(); + if (dp != null) + total += dp.getLength(); + if (total == N) + break; } - long end=System.currentTimeMillis(); - System.out.println("Server time: "+(end-start)+" ms."); + long end = System.currentTimeMillis(); + System.out + .println("Server time: " + (end - start) + " ms."); - } - catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); } - serverRunning=false; + serverRunning = false; } }; - Thread t=new Thread(serverProcess); + Thread t = new Thread(serverProcess); t.start(); - + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <pe...@us...> - 2011-08-05 06:54:32
|
Revision: 61 http://udt-java.svn.sourceforge.net/udt-java/?rev=61&view=rev Author: pete_ Date: 2011-08-05 06:54:24 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Renamed UDPEndPoint to UDPMultiplexer, this will make the implementation easier to understand for new comers who've read UDTv4: Improvements in Performance and Usability. Modified Paths: -------------- udt-java/skunk/src/main/java/udt/ClientSession.java udt-java/skunk/src/main/java/udt/ServerSession.java udt-java/skunk/src/main/java/udt/UDTClient.java udt-java/skunk/src/main/java/udt/UDTCongestionControl.java udt-java/skunk/src/main/java/udt/UDTReceiver.java udt-java/skunk/src/main/java/udt/UDTSender.java udt-java/skunk/src/main/java/udt/UDTServerSocket.java udt-java/skunk/src/main/java/udt/UDTSession.java udt-java/skunk/src/main/java/udt/UDTSocket.java udt-java/skunk/src/main/java/udt/util/Util.java Added Paths: ----------- udt-java/skunk/src/main/java/udt/UDPMultiplexer.java Removed Paths: ------------- udt-java/skunk/src/main/java/udt/UDPEndPoint.java Modified: udt-java/skunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 06:54:24 UTC (rev 61) @@ -50,9 +50,9 @@ private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); - private UDPEndPoint endPoint; + private UDPMultiplexer endPoint; - public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{ + public ClientSession(UDPMultiplexer endPoint, UDTSocketAddress dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; logger.info("Created "+toString()); Modified: udt-java/skunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 06:54:24 UTC (rev 61) @@ -51,12 +51,12 @@ private static final Logger logger=Logger.getLogger(ServerSession.class.getName()); - private final UDPEndPoint endPoint; + private final UDPMultiplexer endPoint; //last received packet (for testing purposes) private UDTPacket lastPacket; - public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ + public ServerSession(DatagramPacket dp, UDPMultiplexer endPoint)throws SocketException,UnknownHostException{ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0)); this.endPoint=endPoint; logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); Deleted: udt-java/skunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 06:54:24 UTC (rev 61) @@ -1,540 +0,0 @@ -/********************************************************************************* - * Copyright (c) 2010 Forschungszentrum Juelich GmbH - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * (1) Redistributions of source code must retain the above copyright notice, - * this list of conditions and the disclaimer at the end. Redistributions in - * binary form must reproduce the above copyright notice, this list of - * conditions and the following disclaimer in the documentation and/or other - * materials provided with the distribution. - * - * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its - * contributors may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * DISCLAIMER - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - *********************************************************************************/ - -package udt; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -import udt.packets.ConnectionHandshake; -import udt.packets.UDTSocketAddress; -import udt.packets.PacketFactory; -import udt.util.ObjectPool; -import udt.util.UDTThreadFactory; - -/** - * the UDPEndpoint takes care of sending and receiving UDP network packets, - * dispatching them to the correct {@link UDTSession} - */ -public class UDPEndPoint { - - //class fields - private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); - public static final int DATAGRAM_SIZE=1400; - - - //class methods - private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints - = new WeakHashMap<SocketAddress, UDPEndPoint>(); - - public static UDPEndPoint get(DatagramSocket socket){ - SocketAddress localInetSocketAddress = null; - UDPEndPoint result = null; - if ( socket.isBound()){ - SocketAddress sa = socket.getLocalSocketAddress(); - if ( sa instanceof InetSocketAddress ){ - localInetSocketAddress = (InetSocketAddress) sa; - } else { - // Must be a special DatagramSocket impl or extended. - localInetSocketAddress = - new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); - } - synchronized (localEndpoints){ - result = localEndpoints.get(localInetSocketAddress); - } - } - if (result != null) return result; - try { - result = new UDPEndPoint(socket); - if (localInetSocketAddress == null){ - // The DatagramSocket was unbound, it should be bound now. - localInetSocketAddress = - new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); - } - } catch (SocketException ex) { - Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex); - } - if (result != null){ - synchronized (localEndpoints){ - UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); - if (exists != null && exists.getSocket().equals(socket)) result = exists; - // Only cache if a record doesn't already exist. - else if (exists == null) localEndpoints.put(localInetSocketAddress, result); - } - } - return result; // may be null. - } - - - - public static UDPEndPoint get(InetAddress localAddress, int localPort){ - InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort); - return get(localInetSocketAddress); - } - - public static UDPEndPoint get(SocketAddress localSocketAddress){ - InetSocketAddress localInetSocketAddress = null; - if (localSocketAddress instanceof InetSocketAddress){ - localInetSocketAddress = (InetSocketAddress) localSocketAddress; - } else if (localSocketAddress instanceof UDTSocketAddress){ - UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress; - localInetSocketAddress = - new InetSocketAddress(udtSA.getAddress(), udtSA.getPort()); - } - if (localInetSocketAddress == null) return null; - UDPEndPoint result = null; - synchronized (localEndpoints){ - result = localEndpoints.get(localInetSocketAddress); - } - if (result != null) return result; - try { - result = new UDPEndPoint(localInetSocketAddress); - if (localInetSocketAddress.getPort() == 0 || - localInetSocketAddress.getAddress().isAnyLocalAddress()){ - // ephemeral port or wildcard address, bind operation is complete. - localInetSocketAddress = - new InetSocketAddress(result.getLocalAddress(), result.getLocalPort()); - } - } catch (SocketException ex) { - logger.log(Level.SEVERE, null, ex); - } catch (UnknownHostException ex) { - logger.log(Level.SEVERE, null, ex); - } - if (result != null){ - synchronized (localEndpoints){ - UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); - if (exists != null) result = exists; - else localEndpoints.put(localInetSocketAddress, result); - } - } - return result; // may be null. - } - - /** - * Allows a custom endpoint to be added to the pool. - * @param endpoint - */ - public static void put(UDPEndPoint endpoint){ - SocketAddress local = endpoint.getSocket().getLocalSocketAddress(); - synchronized (localEndpoints){ - localEndpoints.put(local, endpoint); - } - } - - //object fields - private final int port; - - private final DatagramSocket dgSocket; - - //active sessions keyed by socket ID - private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>(); - - //last received packet - private UDTPacket lastPacket; - - //if the endpoint is configured for a server socket, - //this queue is used to handoff new UDTSessions to the application - private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); - - private final ObjectPool<BlockingQueue<UDTSession>> queuePool - = new ObjectPool<BlockingQueue<UDTSession>>(20); - - private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff - = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120); - - // registered sockets - private final Set<Integer> registeredSockets = new HashSet<Integer>(120); - // registered sockets lock - private final ReadWriteLock rwl = new ReentrantReadWriteLock(); - private final Lock readSocketLock = rwl.readLock(); - private final Lock writeSocketLock = rwl.writeLock(); - - - private boolean serverSocketMode=false; - - //has the endpoint been stopped? - private volatile boolean stopped=false; - - private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); - - - /** - * create an endpoint on the given socket - * - * @param socket - a UDP datagram socket - * @throws SocketException - */ - protected UDPEndPoint(DatagramSocket socket) throws SocketException{ - this.dgSocket=socket; - if (!socket.isBound()){ - socket.bind(null); - } - port=dgSocket.getLocalPort(); - } - - /** - * bind to any local port on the given host address - * @param localAddress - * @throws SocketException - * @throws UnknownHostException - */ - private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{ - this(localAddress,0); - } - - /** - * Bind to the given address and port - * @param localAddress - * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. - * @throws SocketException - * @throws UnknownHostException - */ - private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ - if(localAddress==null){ - dgSocket=new DatagramSocket(localPort, localAddress); - }else{ - dgSocket=new DatagramSocket(localPort); - } - if(localPort>0)this.port = localPort; - else port=dgSocket.getLocalPort(); - - configureSocket(); - } - - private UDPEndPoint (InetSocketAddress localSocketAddress) - throws SocketException, UnknownHostException { - dgSocket = new DatagramSocket(localSocketAddress); - port = dgSocket.getLocalPort(); - configureSocket(); - } - - protected void configureSocket()throws SocketException{ - //set a time out to avoid blocking in doReceive() - dgSocket.setSoTimeout(100000); - //buffer size - dgSocket.setReceiveBufferSize(128*1024); - dgSocket.setReuseAddress(false); - } - - /** - * bind to the default network interface on the machine - * - * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. - * @throws SocketException - * @throws UnknownHostException - */ - public UDPEndPoint(int localPort)throws SocketException, UnknownHostException{ - this(null,localPort); - } - - /** - * bind to an ephemeral port on the default network interface on the machine - * - * @throws SocketException - * @throws UnknownHostException - */ - public UDPEndPoint()throws SocketException, UnknownHostException{ - this(null,0); - } - - /** - * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>, - * a new connection can be handed off to an application. The application needs to - * call #accept() to get the socket - * @param serverSocketModeEnabled - */ - public void start(boolean serverSocketModeEnabled){ - serverSocketMode=serverSocketModeEnabled; - //start receive thread - Runnable receive=new Runnable(){ - public void run(){ - try{ - doReceive(); - }catch(Exception ex){ - logger.log(Level.WARNING,"",ex); - } - } - }; - Thread t=UDTThreadFactory.get().newThread(receive); - t.setName("UDPEndpoint-"+t.getName()); - t.setDaemon(true); - t.start(); - logger.info("UDTEndpoint started."); - } - - public void start(){ - start(false); - } - - public void stop(){ - stopped=true; - dgSocket.close(); - } - - /** - * Provides assistance to a socket to determine a random socket id, - * every caller receives a unique value. This value is unique at the - * time of calling, however it may not be at registration time. - * - * This socketID has not been registered, all socket ID's must be - * registered or connection will fail. - * @return - */ - public int getUniqueSocketID(){ - Integer socketID = nextSocketID.getAndIncrement(); - try{ - readSocketLock.lock(); - while (registeredSockets.contains(socketID)){ - socketID = nextSocketID.getAndIncrement(); - } - return socketID; // should we register it? - } finally { - readSocketLock.unlock(); - } - } - - void registerSocketID(int socketID, UDTSocket socket) throws SocketException { - if (!equals(socket.getEndpoint())) throw new SocketException ( - "Socket doesn't originate for this endpoint: " - + socket.toString()); - try { - writeSocketLock.lock(); - if (registeredSockets.contains(socketID)){ - throw new SocketException("Already registered, Socket ID: " +socketID); - } - registeredSockets.add(socketID); - }finally{ - writeSocketLock.unlock(); - } - } - - /** - * @return the port which this client is bound to - */ - public int getLocalPort() { - return this.dgSocket.getLocalPort(); - } - /** - * @return Gets the local address to which the socket is bound - */ - public InetAddress getLocalAddress(){ - return this.dgSocket.getLocalAddress(); - } - - DatagramSocket getSocket(){ - return dgSocket; - } - - UDTPacket getLastPacket(){ - return lastPacket; - } - - public void addSession(Integer destinationID,UDTSession session){ - logger.log(Level.INFO, "Storing session <{0}>", destinationID); - sessions.put(destinationID, session); - } - - public UDTSession getSession(Long destinationID){ - return sessions.get(destinationID); - } - - public Collection<UDTSession> getSessions(){ - return sessions.values(); - } - - /** - * wait the given time for a new connection - * @param timeout - the time to wait - * @param unit - the {@link TimeUnit} - * @param socketID - the socket id. - * @return a new {@link UDTSession} - * @throws InterruptedException - */ - protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{ - //return sessionHandoff.poll(timeout, unit); - BlockingQueue<UDTSession> session = handoff.get(socketID); - try { - if (session == null){ - session = queuePool.get(); - if (session == null) { - session = new ArrayBlockingQueue<UDTSession>(1); - } - BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session); - if (existed != null){ - session = existed; - } - } - return session.poll(timeout, unit); - } finally { - boolean removed = handoff.remove(socketID, session); - if (removed){ - session.clear(); - queuePool.accept(session); - } - } - } - - - final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE); - - /** - * single receive, run in the receiverThread, see {@link #start()} - * <ul> - * <li>Receives UDP packets from the network</li> - * <li>Converts them to UDT packets</li> - * <li>dispatches the UDT packets according to their destination ID.</li> - * </ul> - * @throws IOException - */ - private long lastDestID=-1; - private UDTSession lastSession; - - private int n=0; - - private final Object lock=new Object(); - - protected void doReceive()throws IOException{ - while(!stopped){ - try{ - //will block until a packet is received or timeout has expired - dgSocket.receive(dp); - UDTSocketAddress peer= null; - int l=dp.getLength(); - UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); - lastPacket=packet; - //handle connection handshake - if(packet.isConnectionHandshake()){ - synchronized(lock){ - Long id=Long.valueOf(packet.getDestinationID()); - UDTSession session=sessions.get(id); - if(session==null){ // What about DOS? - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); -// sessionHandoff.put(session); // blocking method, what about offer? - BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID()); - if (queue != null){ - boolean success = queue.offer(session); - if (success){ - logger.fine("Request taken for processing."); - } else { - logger.fine("Request discarded, queue full."); - } - } else { - logger.fine("No ServerSocket listening at socketID: " - + session.getSocketID() - + "to answer request"); - } - } - } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(), - ((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); - } - } - else{ - //dispatch to existing session - long dest=packet.getDestinationID(); - UDTSession session; - if(dest==lastDestID){ - session=lastSession; - } - else{ - session=sessions.get(dest); - lastSession=session; - lastDestID=dest; - } - if(session==null){ - n++; - if(n%100==1){ - logger.warning("Unknown session <"+dest - +"> requested from <"+peer+"> packet type " - +packet.getClass().getName()); - } - } - else{ - session.received(packet,peer); - } - } - }catch(SocketException ex){ - logger.log(Level.INFO, "SocketException: "+ex.getMessage()); - }catch(SocketTimeoutException ste){ - //can safely ignore... we will retry until the endpoint is stopped - }catch(Exception ex){ - logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); - } - } - } - - protected void doSend(UDTPacket packet)throws IOException{ - byte[]data=packet.getEncoded(); - DatagramPacket dgp = packet.getSession().getDatagram(); - dgp.setData(data); - dgSocket.send(dgp); - } - - public String toString(){ - return "UDPEndpoint port="+port; - } - - public void sendRaw(DatagramPacket p)throws IOException{ - dgSocket.send(p); - } -} Copied: udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (from rev 60, udt-java/skunk/src/main/java/udt/UDPEndPoint.java) =================================================================== --- udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (rev 0) +++ udt-java/skunk/src/main/java/udt/UDPMultiplexer.java 2011-08-05 06:54:24 UTC (rev 61) @@ -0,0 +1,540 @@ +/********************************************************************************* + * Copyright (c) 2010 Forschungszentrum Juelich GmbH + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * (1) Redistributions of source code must retain the above copyright notice, + * this list of conditions and the disclaimer at the end. Redistributions in + * binary form must reproduce the above copyright notice, this list of + * conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * DISCLAIMER + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************************/ + +package udt; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +import udt.packets.ConnectionHandshake; +import udt.packets.UDTSocketAddress; +import udt.packets.PacketFactory; +import udt.util.ObjectPool; +import udt.util.UDTThreadFactory; + +/** + * the UDPMultiplexer takes care of sending and receiving UDP network packets, + * dispatching them to the correct {@link UDTSession} + */ +public class UDPMultiplexer { + + //class fields + private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); + public static final int DATAGRAM_SIZE=1400; + + + //class methods + private static final WeakHashMap<SocketAddress, UDPMultiplexer> localEndpoints + = new WeakHashMap<SocketAddress, UDPMultiplexer>(); + + public static UDPMultiplexer get(DatagramSocket socket){ + SocketAddress localInetSocketAddress = null; + UDPMultiplexer result = null; + if ( socket.isBound()){ + SocketAddress sa = socket.getLocalSocketAddress(); + if ( sa instanceof InetSocketAddress ){ + localInetSocketAddress = (InetSocketAddress) sa; + } else { + // Must be a special DatagramSocket impl or extended. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + } + if (result != null) return result; + try { + result = new UDPMultiplexer(socket); + if (localInetSocketAddress == null){ + // The DatagramSocket was unbound, it should be bound now. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + } catch (SocketException ex) { + Logger.getLogger(UDPMultiplexer.class.getName()).log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress); + if (exists != null && exists.getSocket().equals(socket)) result = exists; + // Only cache if a record doesn't already exist. + else if (exists == null) localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + + + public static UDPMultiplexer get(InetAddress localAddress, int localPort){ + InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort); + return get(localInetSocketAddress); + } + + public static UDPMultiplexer get(SocketAddress localSocketAddress){ + InetSocketAddress localInetSocketAddress = null; + if (localSocketAddress instanceof InetSocketAddress){ + localInetSocketAddress = (InetSocketAddress) localSocketAddress; + } else if (localSocketAddress instanceof UDTSocketAddress){ + UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress; + localInetSocketAddress = + new InetSocketAddress(udtSA.getAddress(), udtSA.getPort()); + } + if (localInetSocketAddress == null) return null; + UDPMultiplexer result = null; + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + if (result != null) return result; + try { + result = new UDPMultiplexer(localInetSocketAddress); + if (localInetSocketAddress.getPort() == 0 || + localInetSocketAddress.getAddress().isAnyLocalAddress()){ + // ephemeral port or wildcard address, bind operation is complete. + localInetSocketAddress = + new InetSocketAddress(result.getLocalAddress(), result.getLocalPort()); + } + } catch (SocketException ex) { + logger.log(Level.SEVERE, null, ex); + } catch (UnknownHostException ex) { + logger.log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress); + if (exists != null) result = exists; + else localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + /** + * Allows a custom endpoint to be added to the pool. + * @param endpoint + */ + public static void put(UDPMultiplexer endpoint){ + SocketAddress local = endpoint.getSocket().getLocalSocketAddress(); + synchronized (localEndpoints){ + localEndpoints.put(local, endpoint); + } + } + + //object fields + private final int port; + + private final DatagramSocket dgSocket; + + //active sessions keyed by socket ID + private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>(); + + //last received packet + private UDTPacket lastPacket; + + //if the endpoint is configured for a server socket, + //this queue is used to handoff new UDTSessions to the application + private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); + + private final ObjectPool<BlockingQueue<UDTSession>> queuePool + = new ObjectPool<BlockingQueue<UDTSession>>(20); + + private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff + = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120); + + // registered sockets + private final Set<Integer> registeredSockets = new HashSet<Integer>(120); + // registered sockets lock + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock readSocketLock = rwl.readLock(); + private final Lock writeSocketLock = rwl.writeLock(); + + + private boolean serverSocketMode=false; + + //has the endpoint been stopped? + private volatile boolean stopped=false; + + private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); + + + /** + * create an endpoint on the given socket + * + * @param socket - a UDP datagram socket + * @throws SocketException + */ + protected UDPMultiplexer(DatagramSocket socket) throws SocketException{ + this.dgSocket=socket; + if (!socket.isBound()){ + socket.bind(null); + } + port=dgSocket.getLocalPort(); + } + + /** + * bind to any local port on the given host address + * @param localAddress + * @throws SocketException + * @throws UnknownHostException + */ + private UDPMultiplexer(InetAddress localAddress)throws SocketException, UnknownHostException{ + this(localAddress,0); + } + + /** + * Bind to the given address and port + * @param localAddress + * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. + * @throws SocketException + * @throws UnknownHostException + */ + private UDPMultiplexer(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ + if(localAddress==null){ + dgSocket=new DatagramSocket(localPort, localAddress); + }else{ + dgSocket=new DatagramSocket(localPort); + } + if(localPort>0)this.port = localPort; + else port=dgSocket.getLocalPort(); + + configureSocket(); + } + + private UDPMultiplexer (InetSocketAddress localSocketAddress) + throws SocketException, UnknownHostException { + dgSocket = new DatagramSocket(localSocketAddress); + port = dgSocket.getLocalPort(); + configureSocket(); + } + + protected void configureSocket()throws SocketException{ + //set a time out to avoid blocking in doReceive() + dgSocket.setSoTimeout(100000); + //buffer size + dgSocket.setReceiveBufferSize(128*1024); + dgSocket.setReuseAddress(false); + } + + /** + * bind to the default network interface on the machine + * + * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. + * @throws SocketException + * @throws UnknownHostException + */ + public UDPMultiplexer(int localPort)throws SocketException, UnknownHostException{ + this(null,localPort); + } + + /** + * bind to an ephemeral port on the default network interface on the machine + * + * @throws SocketException + * @throws UnknownHostException + */ + public UDPMultiplexer()throws SocketException, UnknownHostException{ + this(null,0); + } + + /** + * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>, + * a new connection can be handed off to an application. The application needs to + * call #accept() to get the socket + * @param serverSocketModeEnabled + */ + public void start(boolean serverSocketModeEnabled){ + serverSocketMode=serverSocketModeEnabled; + //start receive thread + Runnable receive=new Runnable(){ + public void run(){ + try{ + doReceive(); + }catch(Exception ex){ + logger.log(Level.WARNING,"",ex); + } + } + }; + Thread t=UDTThreadFactory.get().newThread(receive); + t.setName("UDPEndpoint-"+t.getName()); + t.setDaemon(true); + t.start(); + logger.info("UDTEndpoint started."); + } + + public void start(){ + start(false); + } + + public void stop(){ + stopped=true; + dgSocket.close(); + } + + /** + * Provides assistance to a socket to determine a random socket id, + * every caller receives a unique value. This value is unique at the + * time of calling, however it may not be at registration time. + * + * This socketID has not been registered, all socket ID's must be + * registered or connection will fail. + * @return + */ + public int getUniqueSocketID(){ + Integer socketID = nextSocketID.getAndIncrement(); + try{ + readSocketLock.lock(); + while (registeredSockets.contains(socketID)){ + socketID = nextSocketID.getAndIncrement(); + } + return socketID; // should we register it? + } finally { + readSocketLock.unlock(); + } + } + + void registerSocketID(int socketID, UDTSocket socket) throws SocketException { + if (!equals(socket.getEndpoint())) throw new SocketException ( + "Socket doesn't originate for this endpoint: " + + socket.toString()); + try { + writeSocketLock.lock(); + if (registeredSockets.contains(socketID)){ + throw new SocketException("Already registered, Socket ID: " +socketID); + } + registeredSockets.add(socketID); + }finally{ + writeSocketLock.unlock(); + } + } + + /** + * @return the port which this client is bound to + */ + public int getLocalPort() { + return this.dgSocket.getLocalPort(); + } + /** + * @return Gets the local address to which the socket is bound + */ + public InetAddress getLocalAddress(){ + return this.dgSocket.getLocalAddress(); + } + + DatagramSocket getSocket(){ + return dgSocket; + } + + UDTPacket getLastPacket(){ + return lastPacket; + } + + public void addSession(Integer destinationID,UDTSession session){ + logger.log(Level.INFO, "Storing session <{0}>", destinationID); + sessions.put(destinationID, session); + } + + public UDTSession getSession(Long destinationID){ + return sessions.get(destinationID); + } + + public Collection<UDTSession> getSessions(){ + return sessions.values(); + } + + /** + * wait the given time for a new connection + * @param timeout - the time to wait + * @param unit - the {@link TimeUnit} + * @param socketID - the socket id. + * @return a new {@link UDTSession} + * @throws InterruptedException + */ + protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{ + //return sessionHandoff.poll(timeout, unit); + BlockingQueue<UDTSession> session = handoff.get(socketID); + try { + if (session == null){ + session = queuePool.get(); + if (session == null) { + session = new ArrayBlockingQueue<UDTSession>(1); + } + BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session); + if (existed != null){ + session = existed; + } + } + return session.poll(timeout, unit); + } finally { + boolean removed = handoff.remove(socketID, session); + if (removed){ + session.clear(); + queuePool.accept(session); + } + } + } + + + final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE); + + /** + * single receive, run in the receiverThread, see {@link #start()} + * <ul> + * <li>Receives UDP packets from the network</li> + * <li>Converts them to UDT packets</li> + * <li>dispatches the UDT packets according to their destination ID.</li> + * </ul> + * @throws IOException + */ + private long lastDestID=-1; + private UDTSession lastSession; + + private int n=0; + + private final Object lock=new Object(); + + protected void doReceive()throws IOException{ + while(!stopped){ + try{ + //will block until a packet is received or timeout has expired + dgSocket.receive(dp); + UDTSocketAddress peer= null; + int l=dp.getLength(); + UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); + lastPacket=packet; + //handle connection handshake + if(packet.isConnectionHandshake()){ + synchronized(lock){ + Long id=Long.valueOf(packet.getDestinationID()); + UDTSession session=sessions.get(id); + if(session==null){ // What about DOS? + session=new ServerSession(dp,this); + addSession(session.getSocketID(),session); + //TODO need to check peer to avoid duplicate server session + if(serverSocketMode){ + logger.fine("Pooling new request."); +// sessionHandoff.put(session); // blocking method, what about offer? + BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID()); + if (queue != null){ + boolean success = queue.offer(session); + if (success){ + logger.fine("Request taken for processing."); + } else { + logger.fine("Request discarded, queue full."); + } + } else { + logger.fine("No ServerSocket listening at socketID: " + + session.getSocketID() + + "to answer request"); + } + } + } + peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); + peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(), + ((ConnectionHandshake)packet).getSocketID()); + session.received(packet,peer); + } + } + else{ + //dispatch to existing session + long dest=packet.getDestinationID(); + UDTSession session; + if(dest==lastDestID){ + session=lastSession; + } + else{ + session=sessions.get(dest); + lastSession=session; + lastDestID=dest; + } + if(session==null){ + n++; + if(n%100==1){ + logger.warning("Unknown session <"+dest + +"> requested from <"+peer+"> packet type " + +packet.getClass().getName()); + } + } + else{ + session.received(packet,peer); + } + } + }catch(SocketException ex){ + logger.log(Level.INFO, "SocketException: "+ex.getMessage()); + }catch(SocketTimeoutException ste){ + //can safely ignore... we will retry until the endpoint is stopped + }catch(Exception ex){ + logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); + } + } + } + + protected void doSend(UDTPacket packet)throws IOException{ + byte[]data=packet.getEncoded(); + DatagramPacket dgp = packet.getSession().getDatagram(); + dgp.setData(data); + dgSocket.send(dgp); + } + + public String toString(){ + return "UDPEndpoint port="+port; + } + + public void sendRaw(DatagramPacket p)throws IOException{ + dgSocket.send(p); + } +} Modified: udt-java/skunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 06:54:24 UTC (rev 61) @@ -48,23 +48,23 @@ public class UDTClient { private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private final UDPEndPoint clientEndpoint; + private final UDPMultiplexer clientEndpoint; private ClientSession clientSession; public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint= UDPEndPoint.get(address,localport); + clientEndpoint= UDPMultiplexer.get(address,localport); logger.info("Created client endpoint on port "+localport); } public UDTClient(InetAddress address)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint= UDPEndPoint.get(address, 0); + clientEndpoint= UDPMultiplexer.get(address, 0); logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort()); } - public UDTClient(UDPEndPoint endpoint)throws SocketException, UnknownHostException{ + public UDTClient(UDPMultiplexer endpoint)throws SocketException, UnknownHostException{ clientEndpoint=endpoint; } @@ -153,7 +153,7 @@ return clientSession.getSocket().getOutputStream(); } - public UDPEndPoint getEndpoint()throws IOException{ + public UDPMultiplexer getEndpoint()throws IOException{ return clientEndpoint; } Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 06:54:24 UTC (rev 61) @@ -175,7 +175,7 @@ statistics.setSendPeriod(packetSendingPeriod); } - private final long PS=UDPEndPoint.DATAGRAM_SIZE; + private final long PS=UDPMultiplexer.DATAGRAM_SIZE; private final double BetaDivPS=0.0000015/PS; //see spec page 16 @@ -184,7 +184,7 @@ double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod; if(remaining<=0){ - return 1.0/UDPEndPoint.DATAGRAM_SIZE; + return 1.0/UDPMultiplexer.DATAGRAM_SIZE; } else{ double exp=Math.ceil(Math.log10(remaining*PS*8)); Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 06:54:24 UTC (rev 61) @@ -67,7 +67,7 @@ private static final Logger logger=Logger.getLogger(UDTReceiver.class.getName()); - private final UDPEndPoint endpoint; + private final UDPMultiplexer endpoint; private final UDTSession session; @@ -159,7 +159,7 @@ * create a receiver with a valid {@link UDTSession} * @param session */ - public UDTReceiver(UDTSession session,UDPEndPoint endpoint){ + public UDTReceiver(UDTSession session,UDPMultiplexer endpoint){ this.endpoint = endpoint; this.session=session; this.sessionUpSince=System.currentTimeMillis(); Modified: udt-java/skunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 06:54:24 UTC (rev 61) @@ -67,7 +67,7 @@ private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private final UDPEndPoint endpoint; + private final UDPMultiplexer endpoint; private final UDTSession session; private final UDTStatistics statistics; @@ -117,7 +117,7 @@ private final boolean storeStatistics; private final int chunksize; - public UDTSender(UDTSession session,UDPEndPoint endpoint){ + public UDTSender(UDTSession session,UDPMultiplexer endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 06:54:24 UTC (rev 61) @@ -48,7 +48,7 @@ public class UDTServerSocket extends ServerSocket { private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private volatile UDPEndPoint endpoint; + private volatile UDPMultiplexer endpoint; private volatile InetAddress localAdd; private volatile int locPort; private volatile SocketAddress localSocketAddress; @@ -64,7 +64,7 @@ */ public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{ super(); - endpoint= UDPEndPoint.get(localAddress,port); + endpoint= UDPMultiplexer.get(localAddress,port); localAdd = localAddress; locPort = port; bound = true; @@ -105,7 +105,7 @@ throw new IOException("UDTSession was null"); } - public UDPEndPoint getEndpoint(){ + public UDPMultiplexer getEndpoint(){ return endpoint; } Modified: udt-java/skunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 06:54:24 UTC (rev 61) @@ -91,7 +91,7 @@ protected int localPort; - public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE; + public static final int DEFAULT_DATAGRAM_SIZE=UDPMultiplexer.DATAGRAM_SIZE; /** * key for a system property defining the CC class to be used Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 06:54:24 UTC (rev 61) @@ -62,7 +62,7 @@ = new ArrayList<UDTSocketAddress>(120); //endpoint - private volatile UDPEndPoint endpoint; + private volatile UDPMultiplexer endpoint; private volatile boolean active; private volatile boolean connected; @@ -90,7 +90,7 @@ * @param endpoint * @throws SocketException,UnknownHostException */ - UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ + UDTSocket(UDPMultiplexer endpoint, UDTSession session)throws SocketException,UnknownHostException{ super(); this.endpoint=endpoint; this.session=session; @@ -105,7 +105,7 @@ public UDTSocket(InetAddress host, int port ) throws SocketException, UnknownHostException{ super(); - this.endpoint = UDPEndPoint.get(host, port); + this.endpoint = UDPMultiplexer.get(host, port); this.session = null; this.receiver = null; this.sender = null; @@ -193,7 +193,7 @@ if (boundSockets.contains(bindpoint)) throw new IOException("A socket is already bound to this address"); } - endpoint = UDPEndPoint.get(bindpoint); + endpoint = UDPMultiplexer.get(bindpoint); if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint"); bound = true; } @@ -517,7 +517,7 @@ return active; } - public UDPEndPoint getEndpoint() { + public UDPMultiplexer getEndpoint() { return endpoint; } Modified: udt-java/skunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 06:54:24 UTC (rev 61) @@ -40,7 +40,7 @@ import java.net.InetAddress; import java.security.MessageDigest; -import udt.UDPEndPoint; +import udt.UDPMultiplexer; /** * helper methods @@ -150,7 +150,7 @@ * @return the local port that can now be accessed by the client * @throws IOException */ - public static void doHolePunch(UDPEndPoint endpoint,InetAddress client, int clientPort)throws IOException{ + public static void doHolePunch(UDPMultiplexer endpoint,InetAddress client, int clientPort)throws IOException{ DatagramPacket p=new DatagramPacket(new byte[1],1); p.setAddress(client); p.setPort(clientPort); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <pe...@us...> - 2011-08-05 04:24:09
|
Revision: 60 http://udt-java.svn.sourceforge.net/udt-java/?rev=60&view=rev Author: pete_ Date: 2011-08-05 04:24:00 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Experimental changes, partial implementation of java sockets and UDP multiplexing. Modified Paths: -------------- udt-java/skunk/src/main/java/udt/ClientSession.java udt-java/skunk/src/main/java/udt/ServerSession.java udt-java/skunk/src/main/java/udt/UDPEndPoint.java udt-java/skunk/src/main/java/udt/UDTClient.java udt-java/skunk/src/main/java/udt/UDTCongestionControl.java udt-java/skunk/src/main/java/udt/UDTInputStream.java udt-java/skunk/src/main/java/udt/UDTReceiver.java udt-java/skunk/src/main/java/udt/UDTSender.java udt-java/skunk/src/main/java/udt/UDTServerSocket.java udt-java/skunk/src/main/java/udt/UDTSession.java udt-java/skunk/src/main/java/udt/UDTSocket.java udt-java/skunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/skunk/src/main/java/udt/packets/DataPacket.java udt-java/skunk/src/main/java/udt/packets/PacketFactory.java udt-java/skunk/src/main/java/udt/packets/PacketUtil.java udt-java/skunk/src/main/java/udt/unicore/FufexSend.java udt-java/skunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/skunk/src/main/java/udt/util/ReceiveFile.java udt-java/skunk/src/main/java/udt/util/SendFile.java udt-java/skunk/src/main/java/udt/util/SequenceNumber.java udt-java/skunk/src/test/java/echo/EchoServer.java udt-java/skunk/src/test/java/echo/TestEchoServer.java udt-java/skunk/src/test/java/echo/TestEchoServerMultiClient.java udt-java/skunk/src/test/java/udt/TestUDTServerSocket.java udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java udt-java/skunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/skunk/src/main/java/udt/packets/UDTSocketAddress.java udt-java/skunk/src/main/java/udt/util/ObjectPool.java udt-java/skunk/src/main/java/udt/util/Recycler.java Removed Paths: ------------- udt-java/skunk/src/main/java/udt/packets/Destination.java Property Changed: ---------------- udt-java/skunk/ Property changes on: udt-java/skunk ___________________________________________________________________ Added: svn:ignore + target Modified: udt-java/skunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60) @@ -38,7 +38,7 @@ import java.util.logging.Logger; import udt.packets.ConnectionHandshake; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.Shutdown; import udt.util.SequenceNumber; @@ -52,7 +52,7 @@ private UDPEndPoint endPoint; - public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{ + public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; logger.info("Created "+toString()); @@ -78,7 +78,7 @@ } @Override - public void received(UDTPacket packet, Destination peer) { + public void received(UDTPacket packet, UDTSocketAddress peer) { lastPacket=packet; @@ -91,7 +91,7 @@ if(hs.getConnectionType()==1){ try{ //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); + int peerSocketID=hs.getSocketID(); destination.setSocketID(peerSocketID); sendConfirmation(hs); }catch(Exception ex){ @@ -103,10 +103,12 @@ else{ try{ //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); + int peerSocketID=hs.getSocketID(); destination.setSocketID(peerSocketID); setState(ready); + if (socket == null){ socket=new UDTSocket(endPoint,this); + } }catch(Exception ex){ logger.log(Level.WARNING,"Error creating socket",ex); setState(invalid); @@ -146,9 +148,7 @@ ConnectionHandshake handshake = new ConnectionHandshake(); handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); - long initialSequenceNo=SequenceNumber.random(); - setInitialSequenceNumber(initialSequenceNo); - handshake.setInitialSeqNo(initialSequenceNo); + handshake.setInitialSeqNo(getCurrentSequenceNumber()); handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); Modified: udt-java/skunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60) @@ -40,7 +40,7 @@ import java.util.logging.Logger; import udt.packets.ConnectionHandshake; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.KeepAlive; import udt.packets.Shutdown; @@ -57,7 +57,7 @@ private UDTPacket lastPacket; public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ - super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort())); + super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0)); this.endPoint=endPoint; logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); } @@ -65,7 +65,7 @@ int n_handshake=0; @Override - public void received(UDTPacket packet, Destination peer){ + public void received(UDTPacket packet, UDTSocketAddress peer){ lastPacket=packet; if(packet instanceof ConnectionHandshake) { @@ -83,7 +83,9 @@ n_handshake++; try{ setState(ready); + if (socket == null){ socket=new UDTSocket(endPoint, this); + } cc.init(); }catch(Exception uhe){ //session is invalid @@ -166,7 +168,7 @@ long clientBufferSize=handshake.getPacketSize(); long myBufferSize=getDatagramSize(); long bufferSize=Math.min(clientBufferSize, myBufferSize); - long initialSequenceNumber=handshake.getInitialSeqNo(); + int initialSequenceNumber=(int) handshake.getInitialSeqNo(); setInitialSequenceNumber(initialSequenceNumber); setDatagramSize((int)bufferSize); responseHandshake.setPacketSize(bufferSize); Modified: udt-java/skunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60) @@ -36,20 +36,34 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; import udt.packets.ConnectionHandshake; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.PacketFactory; +import udt.util.ObjectPool; import udt.util.UDTThreadFactory; /** @@ -58,14 +72,116 @@ */ public class UDPEndPoint { + //class fields private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); + public static final int DATAGRAM_SIZE=1400; + + //class methods + private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints + = new WeakHashMap<SocketAddress, UDPEndPoint>(); + + public static UDPEndPoint get(DatagramSocket socket){ + SocketAddress localInetSocketAddress = null; + UDPEndPoint result = null; + if ( socket.isBound()){ + SocketAddress sa = socket.getLocalSocketAddress(); + if ( sa instanceof InetSocketAddress ){ + localInetSocketAddress = (InetSocketAddress) sa; + } else { + // Must be a special DatagramSocket impl or extended. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + } + if (result != null) return result; + try { + result = new UDPEndPoint(socket); + if (localInetSocketAddress == null){ + // The DatagramSocket was unbound, it should be bound now. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + } catch (SocketException ex) { + Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); + if (exists != null && exists.getSocket().equals(socket)) result = exists; + // Only cache if a record doesn't already exist. + else if (exists == null) localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + + + public static UDPEndPoint get(InetAddress localAddress, int localPort){ + InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort); + return get(localInetSocketAddress); + } + + public static UDPEndPoint get(SocketAddress localSocketAddress){ + InetSocketAddress localInetSocketAddress = null; + if (localSocketAddress instanceof InetSocketAddress){ + localInetSocketAddress = (InetSocketAddress) localSocketAddress; + } else if (localSocketAddress instanceof UDTSocketAddress){ + UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress; + localInetSocketAddress = + new InetSocketAddress(udtSA.getAddress(), udtSA.getPort()); + } + if (localInetSocketAddress == null) return null; + UDPEndPoint result = null; + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + if (result != null) return result; + try { + result = new UDPEndPoint(localInetSocketAddress); + if (localInetSocketAddress.getPort() == 0 || + localInetSocketAddress.getAddress().isAnyLocalAddress()){ + // ephemeral port or wildcard address, bind operation is complete. + localInetSocketAddress = + new InetSocketAddress(result.getLocalAddress(), result.getLocalPort()); + } + } catch (SocketException ex) { + logger.log(Level.SEVERE, null, ex); + } catch (UnknownHostException ex) { + logger.log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); + if (exists != null) result = exists; + else localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + /** + * Allows a custom endpoint to be added to the pool. + * @param endpoint + */ + public static void put(UDPEndPoint endpoint){ + SocketAddress local = endpoint.getSocket().getLocalSocketAddress(); + synchronized (localEndpoints){ + localEndpoints.put(local, endpoint); + } + } + + //object fields private final int port; private final DatagramSocket dgSocket; //active sessions keyed by socket ID - private final Map<Long,UDTSession>sessions=new ConcurrentHashMap<Long, UDTSession>(); + private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>(); //last received packet private UDTPacket lastPacket; @@ -74,20 +190,39 @@ //this queue is used to handoff new UDTSessions to the application private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); + private final ObjectPool<BlockingQueue<UDTSession>> queuePool + = new ObjectPool<BlockingQueue<UDTSession>>(20); + + private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff + = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120); + + // registered sockets + private final Set<Integer> registeredSockets = new HashSet<Integer>(120); + // registered sockets lock + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock readSocketLock = rwl.readLock(); + private final Lock writeSocketLock = rwl.writeLock(); + + private boolean serverSocketMode=false; //has the endpoint been stopped? private volatile boolean stopped=false; - public static final int DATAGRAM_SIZE=1400; + private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); + /** * create an endpoint on the given socket * * @param socket - a UDP datagram socket + * @throws SocketException */ - public UDPEndPoint(DatagramSocket socket){ + protected UDPEndPoint(DatagramSocket socket) throws SocketException{ this.dgSocket=socket; + if (!socket.isBound()){ + socket.bind(null); + } port=dgSocket.getLocalPort(); } @@ -97,7 +232,7 @@ * @throws SocketException * @throws UnknownHostException */ - public UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{ + private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{ this(localAddress,0); } @@ -108,7 +243,7 @@ * @throws SocketException * @throws UnknownHostException */ - public UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ + private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ if(localAddress==null){ dgSocket=new DatagramSocket(localPort, localAddress); }else{ @@ -120,6 +255,13 @@ configureSocket(); } + private UDPEndPoint (InetSocketAddress localSocketAddress) + throws SocketException, UnknownHostException { + dgSocket = new DatagramSocket(localSocketAddress); + port = dgSocket.getLocalPort(); + configureSocket(); + } + protected void configureSocket()throws SocketException{ //set a time out to avoid blocking in doReceive() dgSocket.setSoTimeout(100000); @@ -184,6 +326,43 @@ } /** + * Provides assistance to a socket to determine a random socket id, + * every caller receives a unique value. This value is unique at the + * time of calling, however it may not be at registration time. + * + * This socketID has not been registered, all socket ID's must be + * registered or connection will fail. + * @return + */ + public int getUniqueSocketID(){ + Integer socketID = nextSocketID.getAndIncrement(); + try{ + readSocketLock.lock(); + while (registeredSockets.contains(socketID)){ + socketID = nextSocketID.getAndIncrement(); + } + return socketID; // should we register it? + } finally { + readSocketLock.unlock(); + } + } + + void registerSocketID(int socketID, UDTSocket socket) throws SocketException { + if (!equals(socket.getEndpoint())) throw new SocketException ( + "Socket doesn't originate for this endpoint: " + + socket.toString()); + try { + writeSocketLock.lock(); + if (registeredSockets.contains(socketID)){ + throw new SocketException("Already registered, Socket ID: " +socketID); + } + registeredSockets.add(socketID); + }finally{ + writeSocketLock.unlock(); + } + } + + /** * @return the port which this client is bound to */ public int getLocalPort() { @@ -204,8 +383,8 @@ return lastPacket; } - public void addSession(Long destinationID,UDTSession session){ - logger.info("Storing session <"+destinationID+">"); + public void addSession(Integer destinationID,UDTSession session){ + logger.log(Level.INFO, "Storing session <{0}>", destinationID); sessions.put(destinationID, session); } @@ -221,12 +400,33 @@ * wait the given time for a new connection * @param timeout - the time to wait * @param unit - the {@link TimeUnit} + * @param socketID - the socket id. * @return a new {@link UDTSession} * @throws InterruptedException */ - protected UDTSession accept(long timeout, TimeUnit unit)throws InterruptedException{ - return sessionHandoff.poll(timeout, unit); + protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{ + //return sessionHandoff.poll(timeout, unit); + BlockingQueue<UDTSession> session = handoff.get(socketID); + try { + if (session == null){ + session = queuePool.get(); + if (session == null) { + session = new ArrayBlockingQueue<UDTSession>(1); } + BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session); + if (existed != null){ + session = existed; + } + } + return session.poll(timeout, unit); + } finally { + boolean removed = handoff.remove(socketID, session); + if (removed){ + session.clear(); + queuePool.accept(session); + } + } + } final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE); @@ -250,32 +450,42 @@ protected void doReceive()throws IOException{ while(!stopped){ try{ - try{ - //will block until a packet is received or timeout has expired dgSocket.receive(dp); - - Destination peer=new Destination(dp.getAddress(), dp.getPort()); + UDTSocketAddress peer= null; int l=dp.getLength(); UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); lastPacket=packet; - //handle connection handshake if(packet.isConnectionHandshake()){ synchronized(lock){ Long id=Long.valueOf(packet.getDestinationID()); UDTSession session=sessions.get(id); - if(session==null){ + if(session==null){ // What about DOS? session=new ServerSession(dp,this); addSession(session.getSocketID(),session); //TODO need to check peer to avoid duplicate server session if(serverSocketMode){ logger.fine("Pooling new request."); - sessionHandoff.put(session); +// sessionHandoff.put(session); // blocking method, what about offer? + BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID()); + if (queue != null){ + boolean success = queue.offer(session); + if (success){ logger.fine("Request taken for processing."); + } else { + logger.fine("Request discarded, queue full."); } + } else { + logger.fine("No ServerSocket listening at socketID: " + + session.getSocketID() + + "to answer request"); } + } + } peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); + peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(), + ((ConnectionHandshake)packet).getSocketID()); session.received(packet,peer); } } @@ -294,7 +504,9 @@ if(session==null){ n++; if(n%100==1){ - logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + logger.warning("Unknown session <"+dest + +"> requested from <"+peer+"> packet type " + +packet.getClass().getName()); } } else{ @@ -305,8 +517,6 @@ logger.log(Level.INFO, "SocketException: "+ex.getMessage()); }catch(SocketTimeoutException ste){ //can safely ignore... we will retry until the endpoint is stopped - } - }catch(Exception ex){ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } Modified: udt-java/skunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60) @@ -33,13 +33,15 @@ package udt; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.logging.Level; import java.util.logging.Logger; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.Shutdown; import udt.util.UDTStatistics; @@ -52,13 +54,13 @@ public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint=new UDPEndPoint(address,localport); + clientEndpoint= UDPEndPoint.get(address,localport); logger.info("Created client endpoint on port "+localport); } public UDTClient(InetAddress address)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint=new UDPEndPoint(address); + clientEndpoint= UDPEndPoint.get(address, 0); logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort()); } @@ -75,7 +77,7 @@ */ public void connect(String host, int port)throws InterruptedException, UnknownHostException, IOException{ InetAddress address=InetAddress.getByName(host); - Destination destination=new Destination(address,port); + UDTSocketAddress destination= new UDTSocketAddress(address,port,0); //create client session... clientSession=new ClientSession(clientEndpoint,destination); clientEndpoint.addSession(clientSession.getSocketID(), clientSession); @@ -101,7 +103,7 @@ } /** - * sends the given data and waits for acknowledgement + * sends the given data and waits for acknowledgment * @param data - the data to send * @throws IOException * @throws InterruptedException if interrupted while waiting for ack @@ -143,11 +145,11 @@ } } - public UDTInputStream getInputStream()throws IOException{ + public InputStream getInputStream()throws IOException{ return clientSession.getSocket().getInputStream(); } - public UDTOutputStream getOutputStream()throws IOException{ + public OutputStream getOutputStream()throws IOException{ return clientSession.getSocket().getOutputStream(); } Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60) @@ -65,7 +65,7 @@ public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); - lastDecreaseSeqNo=session.getInitialSequenceNumber()-1; + lastDecreaseSeqNo=session.getCurrentSequenceNumber()-1; } /* (non-Javadoc) Modified: udt-java/skunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 04:24:00 UTC (rev 60) @@ -66,10 +66,10 @@ * @param socket - the {@link UDTSocket} * @throws IOException */ - public UDTInputStream(UDTSocket socket)throws IOException{ + UDTInputStream(UDTSocket socket)throws IOException{ this.socket=socket; int capacity=socket!=null? 2 * socket.getSession().getFlowWindowSize() : 128 ; - long initialSequenceNum=socket!=null?socket.getSession().getInitialSequenceNumber():1; + int initialSequenceNum=socket!=null?socket.getSession().getCurrentSequenceNumber():1; receiveBuffer=new ReceiveBuffer(capacity,initialSequenceNum); } Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60) @@ -169,7 +169,7 @@ packetHistoryWindow = new PacketHistoryWindow(16); receiverLossList = new ReceiverLossList(); packetPairWindow = new PacketPairWindow(16); - largestReceivedSeqNumber=session.getInitialSequenceNumber()-1; + largestReceivedSeqNumber=session.getCurrentSequenceNumber()-1; bufferSize=session.getReceiveBufferSize(); handoffQueue=new ArrayBlockingQueue<UDTPacket>(4*session.getFlowWindowSize()); storeStatistics=Boolean.getBoolean("udt.receiver.storeStatistics"); @@ -396,7 +396,7 @@ // return; // } // //} - boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); + boolean OK= ((UDTInputStream) session.getSocket().getInputStream()).haveNewData(currentSequenceNumber,dp.getData()); if(!OK){ //need to drop packet... return; Modified: udt-java/skunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60) @@ -68,9 +68,7 @@ private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); private final UDPEndPoint endpoint; - private final UDTSession session; - private final UDTStatistics statistics; //senderLossList stores the sequence numbers of lost packets @@ -92,31 +90,31 @@ private final AtomicInteger unacknowledged=new AtomicInteger(0); //for generating data packet sequence numbers - private volatile long currentSequenceNumber=0; + //volatile long counters are not atomic. + //private volatile int currentSequenceNumber=0; //the largest data packet sequence number that has actually been sent out - private volatile long largestSentSequenceNumber=-1; + private volatile int largestSentSequenceNumber=-1; //last acknowledge number, initialised to the initial sequence number - private volatile long lastAckSequenceNumber; + private volatile int lastAckSequenceNumber; private volatile boolean started=false; - private volatile boolean stopped=false; - private volatile boolean paused=false; //used to signal that the sender should start to send private volatile CountDownLatch startLatch=new CountDownLatch(1); //used by the sender to wait for an ACK - private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForAckLatch + =new AtomicReference<CountDownLatch>(); //used by the sender to wait for an ACK of a certain sequence number - private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForSeqAckLatch + =new AtomicReference<CountDownLatch>(); private final boolean storeStatistics; - private final int chunksize; public UDTSender(UDTSession session,UDPEndPoint endpoint){ @@ -128,8 +126,8 @@ sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2); chunksize=session.getDatagramSize()-24;//need space for the header; flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize); - lastAckSequenceNumber=session.getInitialSequenceNumber(); - currentSequenceNumber=session.getInitialSequenceNumber()-1; + lastAckSequenceNumber=session.getCurrentSequenceNumber(); +// currentSequenceNumber=session.getCurrentSequenceNumber()-1; waitForAckLatch.set(new CountDownLatch(1)); waitForSeqAckLatch.set(new CountDownLatch(1)); storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics"); @@ -212,6 +210,7 @@ protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{ if(!started)start(); DataPacket packet=null; + do{ packet=flowWindow.getForProducer(); if(packet==null){ @@ -219,7 +218,7 @@ } }while(packet==null);//TODO check timeout... try{ - packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setPacketSequenceNumber(incrementAndGetSequenceNo()); packet.setSession(session); packet.setDestinationID(session.getDestination().getSocketID()); int len=Math.min(bb.remaining(),chunksize); @@ -253,7 +252,7 @@ } }while(packet==null); try{ - packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setPacketSequenceNumber(incrementAndGetSequenceNo()); packet.setSession(session); packet.setDestinationID(session.getDestination().getSocketID()); packet.setData(data); @@ -309,7 +308,7 @@ unacknowledged.decrementAndGet(); } } - lastAckSequenceNumber=Math.max(lastAckSequenceNumber, ackNumber); + lastAckSequenceNumber=(int) Math.max(lastAckSequenceNumber, ackNumber); //send ACK2 packet to the receiver sendAck2(ackNumber); statistics.incNumberOfACKReceived(); @@ -362,22 +361,20 @@ Long entry=senderLossList.getFirstEntry(); if(entry!=null){ handleRetransmit(entry); - } - else - { + }else{ //if the number of unacknowledged data packets does not exceed the congestion //and the flow window sizes, pack a new packet int unAcknowledged=unacknowledged.get(); - - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ + if(unAcknowledged<session.getCongestionControl() + .getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()) + { //check for application data DataPacket dp=flowWindow.consumeData(); if(dp!=null){ send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); - } - else{ + largestSentSequenceNumber=(int) dp.getPacketSequenceNumber(); + }else{ statistics.incNumberOfMissingDataEvents(); } }else{ @@ -388,7 +385,6 @@ waitForAck(); } } - //wait if(largestSentSequenceNumber % 16 !=0){ long snd=(long)session.getCongestionControl().getSendInterval(); @@ -445,13 +441,12 @@ * the next sequence number for data packets. * The initial sequence number is "0" */ - public long getNextSequenceNumber(){ - currentSequenceNumber=SequenceNumber.increment(currentSequenceNumber); - return currentSequenceNumber; + public int incrementAndGetSequenceNo(){ + return session.incrementAndGetSequenceNo(); } - public long getCurrentSequenceNumber(){ - return currentSequenceNumber; + public int getCurrentSequenceNumber(){ + return session.getCurrentSequenceNumber(); } /** Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60) @@ -31,35 +31,48 @@ *********************************************************************************/ package udt; +import java.io.IOException; import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; -public class UDTServerSocket { +public class UDTServerSocket extends ServerSocket { private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private final UDPEndPoint endpoint; + private volatile UDPEndPoint endpoint; + private volatile InetAddress localAdd; + private volatile int locPort; + private volatile SocketAddress localSocketAddress; - private boolean started=false; - + private volatile boolean started=false; + private volatile boolean bound = false; private volatile boolean shutdown=false; /** * create a UDT ServerSocket - * @param localAddress + * @param localSocketAddress * @param port - the local port. If 0, an ephemeral port will be chosen */ - public UDTServerSocket(InetAddress localAddress, int port)throws SocketException,UnknownHostException{ - endpoint=new UDPEndPoint(localAddress,port); + public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{ + super(); + endpoint= UDPEndPoint.get(localAddress,port); + localAdd = localAddress; + locPort = port; + bound = true; logger.info("Created server endpoint on port "+endpoint.getLocalPort()); } //starts a server on localhost - public UDTServerSocket(int port)throws SocketException,UnknownHostException{ + public UDTServerSocket(int port)throws IOException,UnknownHostException{ this(InetAddress.getLocalHost(),port); } @@ -68,13 +81,16 @@ * for the new connection * @return */ - public synchronized UDTSocket accept()throws InterruptedException{ +@Override + public synchronized Socket accept() throws IOException{ if(!started){ endpoint.start(true); started=true; } + // TODO: use a blocking queue. while(!shutdown){ - UDTSession session=endpoint.accept(10000, TimeUnit.MILLISECONDS); + try { + UDTSession session = endpoint.accept(10000, TimeUnit.MILLISECONDS, null); if(session!=null){ //wait for handshake to complete while(!session.isReady() || session.getSocket()==null){ @@ -82,16 +98,107 @@ } return session.getSocket(); } + } catch (InterruptedException ex) { + throw new IOException(ex); } - throw new InterruptedException(); } + throw new IOException("UDTSession was null"); + } - public void shutDown(){ + public UDPEndPoint getEndpoint(){ + return endpoint; + } + + @Override + public void bind(SocketAddress endpoint){ + //TODO: Implement ServerSocket.bind + } + + @Override + public void bind(SocketAddress endpoint, int timeout){ + //TODO: Implement ServerSocket.bind + } + + @Override + public void close(){ shutdown=true; + // TODO: The endpoint might have other ServerSocket's listening, + // we need to pass the endpoint the socket, or session or something + // the endpoint should only stop when it has no remaining sessions. endpoint.stop(); } - public UDPEndPoint getEndpoint(){ - return endpoint; + @Override + public ServerSocketChannel getChannel(){ + return null; } + + @Override + public InetAddress getInetAddress(){ + return localAdd; } + + @Override + public int getLocalPort(){ + return locPort; + } + + @Override + public SocketAddress getLocalSocketAddress(){ + return localSocketAddress; + } + + @Override + public int getReceiveBufferSize(){ + return 0; + } + + @Override + public boolean getReuseAddress(){ + return false; + } + + @Override + public int getSoTimeout(){ + return 0; + } + + @Override + public boolean isBound(){ + return started; + } + + @Override + public boolean isClosed(){ + return shutdown; + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth){ + + } + + @Override + public void setReceiveBufferSize(int size){ + + } + + @Override + public void setReuseAddress(boolean on){ + + } + + @Override + public void setSoTimeout(int timeout){ + + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(120); + sb.append("UDTServerSocket: \n"); + //TODO: add statistics. + return sb.toString(); + } + +} Modified: udt-java/skunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60) @@ -33,12 +33,16 @@ package udt; import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; +import udt.util.SequenceNumber; import udt.util.UDTStatistics; public abstract class UDTSession { @@ -79,7 +83,7 @@ /** * remote UDT entity (address and socket ID) */ - protected final Destination destination; + protected final UDTSocketAddress destination; /** * local port @@ -101,16 +105,19 @@ */ protected int datagramSize=DEFAULT_DATAGRAM_SIZE; - protected Long initialSequenceNumber=null; + protected int initialSequenceNumber=11; - protected final long mySocketID; + private final AtomicInteger sequenceNo = new AtomicInteger(SequenceNumber.random()); - private final static AtomicLong nextSocketID=new AtomicLong(20+new Random().nextInt(5000)); + protected final int mySocketID; - public UDTSession(String description, Destination destination){ + private final static AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); + + public UDTSession(String description, UDTSocketAddress dest){ + InetSocketAddress inetAdd = null; statistics=new UDTStatistics(description); mySocketID=nextSocketID.incrementAndGet(); - this.destination=destination; + this.destination= dest; this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort()); String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName()); Object ccObject=null; @@ -125,10 +132,8 @@ logger.info("Using "+cc.getClass().getName()); } + public abstract void received(UDTPacket packet, UDTSocketAddress peer); - public abstract void received(UDTPacket packet, Destination peer); - - public UDTSocket getSocket() { return socket; } @@ -137,7 +142,7 @@ return cc; } - public int getState() { + protected int getState() { return state; } @@ -149,7 +154,7 @@ this.socket = socket; } - public void setState(int state) { + protected void setState(int state) { logger.info(toString()+" connection state CHANGED to <"+state+">"); this.state = state; } @@ -170,7 +175,7 @@ return state==shutdown || state==invalid; } - public Destination getDestination() { + public UDTSocketAddress getDestination() { return destination; } @@ -202,21 +207,30 @@ return statistics; } - public long getSocketID(){ + public int getSocketID(){ return mySocketID; } - public synchronized long getInitialSequenceNumber(){ - if(initialSequenceNumber==null){ - initialSequenceNumber=1l; //TODO must be random? + public int getCurrentSequenceNumber(){ + return sequenceNo.get(); } - return initialSequenceNumber; + + public int incrementAndGetSequenceNo(){ + while (true){ + int sequence = sequenceNo.get(); + int increment = SequenceNumber.increment(sequence); + boolean success = false; + success = sequenceNo.compareAndSet(sequence, increment); + if (success) return increment; } + } - public synchronized void setInitialSequenceNumber(long initialSequenceNumber){ - this.initialSequenceNumber=initialSequenceNumber; + protected void setInitialSequenceNumber(int initialSequenceNumber){ + if (state == handshaking){ + sequenceNo.set(initialSequenceNumber); } + } public DatagramPacket getDatagram(){ return dgPacket; Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60) @@ -32,10 +32,23 @@ package udt; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; +import java.net.SocketOptions; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import udt.packets.UDTSocketAddress; /** * UDTSocket is analogous to a normal java.net.Socket, it provides input and * output streams for the application @@ -43,34 +56,443 @@ * TODO is it possible to actually extend java.net.Socket ? * */ -public class UDTSocket { +public class UDTSocket extends Socket{ + private static final ArrayList<UDTSocketAddress> boundSockets + = new ArrayList<UDTSocketAddress>(120); + //endpoint - private final UDPEndPoint endpoint; + private volatile UDPEndPoint endpoint; private volatile boolean active; + private volatile boolean connected; + private volatile boolean bound; + private volatile boolean shutIn; // receiver closed. + private volatile boolean shutOut; // sender closed. + private volatile boolean closed; //processing received data - private UDTReceiver receiver; - private UDTSender sender; + private volatile UDTReceiver receiver; + private volatile UDTSender sender; - private final UDTSession session; + private volatile UDTSession session; - private UDTInputStream inputStream; - private UDTOutputStream outputStream; + private volatile UDTInputStream inputStream; + private volatile UDTOutputStream outputStream; + + private volatile UDTSocketAddress localSocketAddress; + private volatile UDTSocketAddress destination; /** + * The session is usually the caller for this constructor + * so the session already knows this is the socket. * @param host * @param port * @param endpoint * @throws SocketException,UnknownHostException */ - public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ + UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ + super(); this.endpoint=endpoint; this.session=session; this.receiver=new UDTReceiver(session,endpoint); this.sender=new UDTSender(session,endpoint); + localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(), + endpoint.getLocalPort(), session.getSocketID()); + destination = session.getDestination(); + bound = true; } + public UDTSocket(InetAddress host, int port ) throws SocketException, + UnknownHostException{ + super(); + this.endpoint = UDPEndPoint.get(host, port); + this.session = null; + this.receiver = null; + this.sender = null; + active = false; + bound = true; + + } + + public UDTSocket(){ + super(); + endpoint = null; + session = null; + receiver = null; + sender = null; + active = false; + bound = false; + } + + @Override + public void connect(SocketAddress destination) throws IOException { + connect(destination, 0); + } + + @Override + public void connect(SocketAddress destination, int timeout) throws IOException { + if (destination == null) throw new IllegalArgumentException("connect: The address can't be null"); + if (timeout < 0) throw new IllegalArgumentException("connect: timeout can't be negative"); + if (isClosed()) throw new SocketException("Socket is closed"); + if (isConnected()) throw new SocketException("already connected"); + if (!(destination instanceof UDTSocketAddress)) + throw new IllegalArgumentException("Unsupported address type"); + + UDTSocketAddress epoint = (UDTSocketAddress) destination; + InetAddress addr = epoint.getAddress(); + int port = epoint.getPort(); + + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkConnect(addr.getHostAddress(),port); + } + if (session == null){ + session = new ClientSession(endpoint, (UDTSocketAddress) destination); + session.setSocket(this); + } + endpoint.addSession(session.getSocketID(), session); + receiver = new UDTReceiver(session, endpoint); + sender = new UDTSender(session, endpoint); + localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(), + endpoint.getLocalPort(), session.getSocketID()); + endpoint.start(); + try { + ((ClientSession)session).connect(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + destination = session.getDestination(); + connected = true; + /* + * If the socket was not bound before the connect, it is now because + * the kernel will have picked an ephemeral port & a local address + */ + bound = true; + } + + /** + * Binds the socket to a local address. + * <P> + * If the address is <code>null</code>, then the system will pick up + * an ephemeral port and a valid local address to bind the socket. + * + * @param bindpoint the <code>SocketAddress</code> to bind to + * @throws IOException if the bind operation fails, or if the socket + * is already bound. + * @throws IllegalArgumentException if bindpoint is a + * SocketAddress subclass not supported by this socket + * + * @since 1.4 + * @see #isBound + */ + public void bind(SocketAddress bindpoint) throws IOException { + if (!(bindpoint instanceof UDTSocketAddress)) + throw new IllegalArgumentException("Unsupported SocketAddress type"); + if (isBound()) throw new IOException("Socket already bound"); + synchronized (boundSockets) { + if (boundSockets.contains(bindpoint)) throw + new IOException("A socket is already bound to this address"); + } + endpoint = UDPEndPoint.get(bindpoint); + if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint"); + bound = true; + } + + @Override + public InetAddress getInetAddress() { + if (!isConnected()) return null; + return destination.getAddress(); + } + + // Inherit javadoc. + @Override + public InetAddress getLocalAddress() { + // This is for backward compatibility only, the super is not bound and + // returns InetAddress.anyLocalAddress(); + if (!isBound()) super.getLocalAddress(); + return localSocketAddress.getAddress(); + } + + // Inherit javadoc. + @Override + public int getPort() { + if (!isConnected()) return 0; + return destination.getPort(); + } + + // Inherit javadoc. + @Override + public int getLocalPort() { + if (!isBound()) return -1; + return localSocketAddress.getPort(); + } + + // Inherit javadoc. + @Override + public SocketAddress getRemoteSocketAddress() { + if (!isConnected()) return null; + return destination; + } + + // Inherit javadoc. + @Override + public SocketAddress getLocalSocketAddress() { + if (!isBound()) return null; + return localSocketAddress; + } + + @Override + public SocketChannel getChannel() { + return null; + } + + /** + * Not supported + * @param on + * @throws SocketException + */ + @Override + public void setTcpNoDelay(boolean on) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on)); + } + + /** + * Not supported + * @return false + * @throws SocketException + */ + @Override + public boolean getTcpNoDelay() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + return ((Boolean) getOption(SocketOptions.TCP_NODELAY)).booleanValue(); + } + + /** + * Not supported. + * @param on + * @param linger + * @throws SocketException + */ + @Override + public void setSoLinger(boolean on, int linger) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + if (linger < 0) throw new IllegalArgumentException("SO_LINGER cannot be less than zero"); + // do nothing, not supported. + } + + @Override + public int getSoLinger() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + return -1; // implies this option is disabled + } + + // Sending of urgent data, should we support it? + @Override + public void sendUrgentData (int data) throws IOException { + throw new SocketException ("Urgent data not supported"); + } + + // Sending of urgent data, should we enable it? + @Override + public void setOOBInline(boolean on) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.SO_OOBINLINE, Boolean.valueOf(on)); + } + + @Override + public boolean getOOBInline() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + return ((Boolean) getOption(SocketOptions.SO_OOBINLINE)).booleanValue(); + } + + // TODO: implement set socket timeout. + @Override + public void setSoTimeout(int timeout) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + if (timeout < 0) throw new IllegalArgumentException("negative timeout not allowed"); + setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout)); + } + + // TODO: Implement get socket timeout. + @Override + public synchronized int getSoTimeout() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + Object o = getOption(SocketOptions.SO_TIMEOUT); + if (o instanceof Integer) return ((Integer) o).intValue(); + return 0; + } + + // object method signature compatibility only, not currently supported. + @Override + public void setSendBufferSize(int size) + throws SocketException{ + if (!(size > 0)) throw new IllegalArgumentException("negative send size not allowed"); + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.SO_SNDBUF, new Integer(size)); + } + + // object method signature compatibility only, not currently supported. + @Override + public int getSendBufferSize() throws SocketException { + if (isClosed()) throw new SocketException("Socket is closed"); + int result = 0; + Object o = getOption(SocketOptions.SO_SNDBUF); + if (o instanceof Integer) result = ((Integer)o).intValue(); + return result; + } + + // object method signature compatibility only, not currently supported. + @Override + public void setReceiveBufferSize(int size) + throws SocketException{ + if (size <= 0) throw new IllegalArgumentException("invalid receive size"); + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.SO_RCVBUF, new Integer(size)); + } + + // object method signature compatibility only, not currently supported. + @Override + public int getReceiveBufferSize() + throws SocketException{ + if (isClosed()) throw new SocketException("Socket closed"); + int result = 0; + Object o = getOption(SocketOptions.SO_RCVBUF); + if (o instanceof Integer) { + result = ((Integer)o).intValue(); + } + return result; + } + + // TODO: Implement keep alive. + @Override + public void setKeepAlive(boolean on) throws SocketException { + if (isClosed()) + throw new SocketException("Socket is closed"); + setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on)); + } + + + @Override + public boolean getKeepAlive() throws SocketException { + // Keep alive is not currently implemented in the udt session. + if (isClosed()) + throw new SocketException("Socket is closed"); + return ((Boolean) getOption(SocketOptions.SO_KEEPALIVE)).booleanValue(); + } + + public void setTrafficClass(int tc) throws SocketException { + // Safe to ignore, not supported. + } + + public int getTrafficClass() throws SocketException { + // Call redirected to underlying DatagramSocket. + return endpoint.getSocket().getTrafficClass(); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException { + throw new SocketException("SO_REUSEADDR not supported"); + } + + /** + * Tests if SO_REUSEADDR is enabled. + * + * @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is enabled. + * @exception SocketException if there is an error + * in the underlying protocol, such as a TCP error. + * @since 1.4 + * @see #setReuseAddress(boolean) + */ + @Override + public boolean getReuseAddress() throws SocketException { + if (isClosed()) throw new SocketException("Socket is closed"); + return ((Boolean) (getOption(SocketOptions.SO_REUSEADDR))).booleanValue(); + } + + /** + * Places the input stream for this socket at "end of stream". + * Any data sent to the input stream side of the socket is acknowledged + * and then silently discarded. + * <p> + * If you read from a socket input stream after invoking + * shutdownInput() on the socket, the stream will return EOF. + * + * @exception IOException if an I/O error occurs when shutting down this + * socket. + * + * @since 1.3 + * @see java.net.Socket#shutdownOutput() + * @see java.net.Socket#close() + * @see java.net.Socket#setSoLinger(boolean, int) + * @see #isInputShutdown + */ + @Override + public void shutdownInput() throws IOException + { + if (isClosed()) throw new SocketException("Socket closed"); + if (!isConnected()) throw new SocketException("Socket not connected"); + if (isInputShutdown()) throw new SocketException("Socket input already shutdown"); + receiver.stop(); + inputStream.close(); + shutIn = true; + } + + @Override + public void shutdownOutput() throws IOException + { + if (isClosed()) throw new SocketException("Socket closed"); + if (!isConnected()) throw new SocketException("Socket is not connected"); + if (isOutputShutdown()) throw new SocketException("Socket output is already shutdown"); + sender.stop(); + outputStream.close(); + shutOut = true; + } + + /** + * Returns the connection state of the socket. + * + * @return true if the socket successfuly connected to a server + * @since 1.4 + */ + @Override + public boolean isConnected() { + // Before 1.3 Sockets were always connected during creation + return connected; + } + + @Override + public boolean isBound() { + return bound ; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public boolean isInputShutdown() { + return shutIn; + } + + /** + * Returns whether the write-half of the socket connection is closed. + * + * @return true if the output of the socket has been shutdown + * @since 1.4 + * @see #shutdownOutput + */ + @Override + public boolean isOutputShutdown() { + return shutOut; + } + + // some preliminary support for socket options. + private Object getOption(int optID) { + return Boolean.FALSE; + } + public UDTReceiver getReceiver() { return receiver; } @@ -87,9 +509,9 @@ this.sender = sender; } - public void setActive(boolean active) { - this.active = active; - } +// public void setActive(boolean active) { +// this.active = active; +// } public boolean isActive() { return active; @@ -103,7 +525,8 @@ * get the input stream for reading from this socket * @return */ - public synchronized UDTInputStream getInputStream()throws IOException{ + @Override + public synchronized InputStream getInputStream()throws IOException{ if(inputStream==null){ inputStream=new UDTInputStream(this); } @@ -114,7 +537,8 @@ * get the output stream for writing to this socket * @return */ - public synchronized UDTOutputStream getOutputStream(){ + @Override + public synchronized OutputStream getOutputStream(){ if(outputStream==null){ outputStream=new UDTOutputStream(this); } @@ -126,7 +550,7 @@ } /** - * write single block of data without waiting for any acknowledgement + * write single block of data without waiting for any acknowledgment * @param data */ protected void doWrite(byte[]data)throws IOException{ @@ -176,6 +600,8 @@ /** * will block until the outstanding packets have really been sent out * and acknowledged + * + * @throws InterruptedException */ protected void flush() throws InterruptedException{ if(!active)return; @@ -200,14 +626,38 @@ flush(); } + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(400); + sb .append("UDTSocket: \n") + .append("Local address: ") + ... [truncated message content] |
From: <pe...@us...> - 2011-08-05 01:14:34
|
Revision: 59 http://udt-java.svn.sourceforge.net/udt-java/?rev=59&view=rev Author: pete_ Date: 2011-08-05 01:14:28 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Skunk experimental branch Added Paths: ----------- udt-java/skunk/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <pe...@us...> - 2011-08-05 01:13:57
|
Revision: 58 http://udt-java.svn.sourceforge.net/udt-java/?rev=58&view=rev Author: pete_ Date: 2011-08-05 01:13:51 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Skunk experimental branch Removed Paths: ------------- udt-java/skunk/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <pe...@us...> - 2011-08-05 01:12:04
|
Revision: 57 http://udt-java.svn.sourceforge.net/udt-java/?rev=57&view=rev Author: pete_ Date: 2011-08-05 01:11:58 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Skunk experimental branch Added Paths: ----------- udt-java/skunk/trunk/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <pe...@us...> - 2011-08-05 01:09:47
|
Revision: 56 http://udt-java.svn.sourceforge.net/udt-java/?rev=56&view=rev Author: pete_ Date: 2011-08-05 01:09:41 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Move skunk down in directory tree Added Paths: ----------- udt-java/skunk/ Removed Paths: ------------- skunk/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <pe...@us...> - 2011-08-05 01:04:17
|
Revision: 55 http://udt-java.svn.sourceforge.net/udt-java/?rev=55&view=rev Author: pete_ Date: 2011-08-05 01:04:11 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Skunk experimental branch Added Paths: ----------- skunk/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-02-17 21:24:39
|
Revision: 54 http://udt-java.svn.sourceforge.net/udt-java/?rev=54&view=rev Author: bschuller Date: 2011-02-17 21:24:32 +0000 (Thu, 17 Feb 2011) Log Message: ----------- bit of refactoring of sender to avoid memory allocations for data and data packets Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java udt-java/trunk/src/test/java/udt/sender/ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-02-17 21:24:32 UTC (rev 54) @@ -168,6 +168,7 @@ } }; Thread t=UDTThreadFactory.get().newThread(receive); + t.setName("UDPEndpoint-"+t.getName()); t.setDaemon(true); t.start(); logger.info("UDTEndpoint started."); Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-02-17 21:24:32 UTC (rev 54) @@ -214,6 +214,8 @@ } }; receiverThread=UDTThreadFactory.get().newThread(r); + String s=(session instanceof ServerSession)? "ServerSession": "ClientSession"; + receiverThread.setName("UDTReceiver-"+s+"-"+receiverThread.getName()); receiverThread.start(); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-02-17 21:24:32 UTC (rev 54) @@ -33,9 +33,8 @@ package udt; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -49,6 +48,7 @@ import udt.packets.DataPacket; import udt.packets.KeepAlive; import udt.packets.NegativeAcknowledgement; +import udt.sender.FlowWindow; import udt.sender.SenderLossList; import udt.util.MeanThroughput; import udt.util.MeanValue; @@ -76,13 +76,12 @@ //senderLossList stores the sequence numbers of lost packets //fed back by the receiver through NAK pakets private final SenderLossList senderLossList; - + //sendBuffer stores the sent data packets and their sequence numbers - private final Map<Long,DataPacket>sendBuffer; - - //sendQueue contains the packets to send - private final BlockingQueue<DataPacket>sendQueue; - + private final Map<Long,byte[]>sendBuffer; + + private final FlowWindow flowWindow; + //thread reading packets from send queue and sending them private Thread senderThread; @@ -117,15 +116,18 @@ private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); private final boolean storeStatistics; - + + private final int chunksize; + public UDTSender(UDTSession session,UDPEndPoint endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; statistics=session.getStatistics(); senderLossList=new SenderLossList(); - sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2); - sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true); + sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2); + chunksize=session.getDatagramSize()-24;//need space for the header; + flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize); lastAckSequenceNumber=session.getInitialSequenceNumber(); currentSequenceNumber=session.getInitialSequenceNumber()-1; waitForAckLatch.set(new CountDownLatch(1)); @@ -179,16 +181,14 @@ } }; senderThread=UDTThreadFactory.get().newThread(r); + String s=(session instanceof ServerSession)? "ServerSession": "ClientSession"; + senderThread.setName("UDTSender-"+s+"-"+senderThread.getName()); senderThread.start(); } /** * sends the given data packet, storing the relevant information - * - * @param data - * @throws IOException - * @throws InterruptedException */ private void send(DataPacket p)throws IOException{ synchronized(sendLock){ @@ -203,28 +203,63 @@ throughput.end(); throughput.begin(); } - sendBuffer.put(p.getPacketSequenceNumber(), p); + sendBuffer.put(p.getPacketSequenceNumber(), p.getData()); unacknowledged.incrementAndGet(); } statistics.incNumberOfSentDataPackets(); } + protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{ + if(!started)start(); + DataPacket packet=null; + do{ + packet=flowWindow.getForProducer(); + if(packet==null){ + Thread.sleep(10); + } + }while(packet==null);//TODO check timeout... + try{ + packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setSession(session); + packet.setDestinationID(session.getDestination().getSocketID()); + int len=Math.min(bb.remaining(),chunksize); + byte[] data=packet.getData(); + bb.get(data,0,len); + packet.setLength(len); + }finally{ + flowWindow.produce(); + } + + } + /** - * writes a data packet into the sendQueue, waiting at most for the specified time + * writes a data packet, waiting at most for the specified time * if this is not possible due to a full send queue * - * @return <code>true</code>if the packet was added, <code>false</code> if the - * packet could not be added because the queue was full - * @param p * @param timeout * @param units * @return * @throws IOException * @throws InterruptedException */ - protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{ + protected void sendUdtPacket(byte[]data, int timeout, TimeUnit units)throws IOException, InterruptedException{ if(!started)start(); - return sendQueue.offer(p,timeout,units); + DataPacket packet=null; + do{ + packet=flowWindow.getForProducer(); + if(packet==null){ + Thread.sleep(10); + // System.out.println("queue full: "+flowWindow); + } + }while(packet==null); + try{ + packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setSession(session); + packet.setDestinationID(session.getDestination().getSocketID()); + packet.setData(data); + }finally{ + flowWindow.produce(); + } } //receive a packet from server from the peer @@ -268,6 +303,7 @@ for(long s=lastAckSequenceNumber;s<ackNumber;s++){ synchronized (sendLock) { removed=sendBuffer.remove(s)!=null; + senderLossList.remove(s); } if(removed){ unacknowledged.decrementAndGet(); @@ -291,7 +327,7 @@ session.getCongestionControl().onLoss(nak.getDecodedLossInfo()); session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); - + if(logger.isLoggable(Level.FINER)){ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " +"set send period to "+session.getCongestionControl().getSendInterval()); @@ -322,13 +358,11 @@ public void senderAlgorithm()throws InterruptedException, IOException{ while(!paused){ iterationStart=Util.getCurrentTime(); - //if the sender's loss list is not empty - if (!senderLossList.isEmpty()) { - Long entry=senderLossList.getFirstEntry(); - handleResubmit(entry); + Long entry=senderLossList.getFirstEntry(); + if(entry!=null){ + handleRetransmit(entry); } - else { //if the number of unacknowledged data packets does not exceed the congestion @@ -336,9 +370,9 @@ int unAcknowledged=unacknowledged.get(); if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ + && unAcknowledged<session.getFlowWindowSize()){ //check for application data - DataPacket dp=sendQueue.poll(); + DataPacket dp=flowWindow.consumeData(); if(dp!=null){ send(dp); largestSentSequenceNumber=dp.getPacketSequenceNumber(); @@ -374,15 +408,21 @@ } /** - * re-submits an entry from the sender loss list + * re-transmit an entry from the sender loss list * @param entry */ - protected void handleResubmit(Long seqNumber){ + protected void handleRetransmit(Long seqNumber){ try { //retransmit the packet and remove it from the list - DataPacket pktToRetransmit = sendBuffer.get(seqNumber); - if(pktToRetransmit!=null){ - endpoint.doSend(pktToRetransmit); + byte[]data=sendBuffer.get(seqNumber); + if(data!=null){ + //System.out.println("re-transmit "+data); + DataPacket packet=new DataPacket(); + packet.setPacketSequenceNumber(seqNumber); + packet.setSession(session); + packet.setDestinationID(session.getDestination().getSocketID()); + packet.setData(data); + endpoint.doSend(packet); statistics.incNumberOfRetransmittedDataPackets(); } }catch (Exception e) { @@ -457,14 +497,14 @@ */ public void waitForAck()throws InterruptedException{ waitForAckLatch.set(new CountDownLatch(1)); - waitForAckLatch.get().await(2, TimeUnit.MILLISECONDS); + waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS); } public void stop(){ stopped=true; } - + public void pause(){ startLatch=new CountDownLatch(1); paused=true; Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2011-02-17 21:24:32 UTC (rev 54) @@ -74,7 +74,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=1024; + protected int flowWindowSize=1024*10; /** * remote UDT entity (address and socket ID) Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-02-17 21:24:32 UTC (rev 54) @@ -36,46 +36,41 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; - -import udt.packets.DataPacket; - /** * UDTSocket is analogous to a normal java.net.Socket, it provides input and * output streams for the application * * TODO is it possible to actually extend java.net.Socket ? * - * */ public class UDTSocket { - + //endpoint private final UDPEndPoint endpoint; - + private volatile boolean active; - - //processing received data + + //processing received data private UDTReceiver receiver; private UDTSender sender; - + private final UDTSession session; private UDTInputStream inputStream; private UDTOutputStream outputStream; - /** - * @param host - * @param port - * @param endpoint - * @throws SocketException,UnknownHostException - */ + * @param host + * @param port + * @param endpoint + * @throws SocketException,UnknownHostException + */ public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ this.endpoint=endpoint; this.session=session; this.receiver=new UDTReceiver(session,endpoint); this.sender=new UDTSender(session,endpoint); } - + public UDTReceiver getReceiver() { return receiver; } @@ -114,7 +109,7 @@ } return inputStream; } - + /** * get the output stream for writing to this socket * @return @@ -125,20 +120,20 @@ } return outputStream; } - + public final UDTSession getSession(){ return session; } - + /** * write single block of data without waiting for any acknowledgement * @param data */ protected void doWrite(byte[]data)throws IOException{ doWrite(data, 0, data.length); - + } - + /** * write the given data * @param data - the data array @@ -148,14 +143,14 @@ */ protected void doWrite(byte[]data, int offset, int length)throws IOException{ try{ - doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + doWrite(data, offset, length, 10, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ IOException io=new IOException(); io.initCause(ie); throw io; } } - + /** * write the given data, waiting at most for the specified time if the queue is full * @param data @@ -167,26 +162,17 @@ * @throws InterruptedException */ protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{ - int chunksize=session.getDatagramSize()-24;//need some bytes for the header ByteBuffer bb=ByteBuffer.wrap(data,offset,length); - long seqNo=0; while(bb.remaining()>0){ - int len=Math.min(bb.remaining(),chunksize); - byte[]chunk=new byte[len]; - bb.get(chunk); - DataPacket packet=new DataPacket(); - seqNo=sender.getNextSequenceNumber(); - packet.setPacketSequenceNumber(seqNo); - packet.setSession(session); - packet.setDestinationID(session.getDestination().getSocketID()); - packet.setData(chunk); - //put the packet into the send queue - if(!sender.sendUdtPacket(packet, timeout, units)){ - throw new IOException("Queue full"); + try{ + sender.sendUdtPacket(bb, timeout, units); + }catch(Exception ex){ + ex.printStackTrace(); } } if(length>0)active=true; } + /** * will block until the outstanding packets have really been sent out * and acknowledged @@ -207,13 +193,13 @@ //TODO need to check if we can pause the sender... //sender.pause(); } - + //writes and wait for ack protected void doWriteBlocking(byte[]data)throws IOException, InterruptedException{ doWrite(data); flush(); } - + /** * close the connection * @throws IOException Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-02-17 21:24:32 UTC (rev 54) @@ -44,12 +44,15 @@ private long destinationID; private UDTSession session; + + private int dataLength; public DataPacket(){ } /** - * create a DataPacket + * create a DataPacket from the given raw data + * * @param encodedData - network data */ public DataPacket(byte[] encodedData){ @@ -58,6 +61,7 @@ public DataPacket(byte[] encodedData, int length){ decode(encodedData,length); + dataLength=length; } void decode(byte[]encodedData,int length){ @@ -75,16 +79,16 @@ } public double getLength(){ - return data.length; + return dataLength; } - /* - * aplivation data - * @param - */ - + public void setLength(int length){ + dataLength=length; + } + public void setData(byte[] data) { this.data = data; + dataLength=data.length; } public long getPacketSequenceNumber() { @@ -125,12 +129,12 @@ */ public byte[] getEncoded(){ //header.length is 16 - byte[] result=new byte[16+data.length]; + byte[] result=new byte[16+dataLength]; System.arraycopy(PacketUtil.encode(packetSequenceNumber), 0, result, 0, 4); System.arraycopy(PacketUtil.encode(messageNumber), 0, result, 4, 4); System.arraycopy(PacketUtil.encode(timeStamp), 0, result, 8, 4); System.arraycopy(PacketUtil.encode(destinationID), 0, result, 12, 4); - System.arraycopy(data, 0, result, 16, data.length); + System.arraycopy(data, 0, result, 16, dataLength); return result; } Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-02-17 21:24:32 UTC (rev 54) @@ -39,12 +39,17 @@ */ public class PacketHistoryWindow extends CircularArray<Long>{ + private final long[]intervals; + private final int num; + /** * create a new PacketHistoryWindow of the given size * @param size */ public PacketHistoryWindow(int size){ super(size); + num=max-1; + intervals=new long[num]; } /** @@ -54,12 +59,11 @@ */ public long getPacketArrivalSpeed(){ if(!haveOverflow)return 0; - int num=max-1; + double AI; double medianPacketArrivalSpeed; double total=0; int count=0; - long[]intervals=new long[num]; int pos=position-1; if(pos<0)pos=num; do{ Added: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java (rev 0) +++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-02-17 21:24:32 UTC (rev 54) @@ -0,0 +1,139 @@ +package udt.sender; + +import java.util.concurrent.locks.ReentrantLock; + +import udt.packets.DataPacket; + +/** + * + * holds a fixed number of {@link DataPacket} instances which are sent out. + * + * it is assumed that a single thread stores new data, and another single thread + * reads/removes data + * + * @author schuller + */ +public class FlowWindow { + + private final DataPacket[]packets; + + private final int length; + + private volatile boolean isEmpty=true; + + private volatile boolean isFull=false; + + private volatile int validEntries=0; + + private volatile boolean isCheckout=false; + + private volatile int writePos=0; + + private volatile int readPos=-1; + + private volatile int consumed=0; + + private volatile int produced=0; + + private final ReentrantLock lock; + + /** + * @param size - flow window size + * @param chunksize - data chunk size + */ + public FlowWindow(int size, int chunksize){ + this.length=size; + packets=new DataPacket[length]; + for(int i=0;i<packets.length;i++){ + packets[i]=new DataPacket(); + packets[i].setData(new byte[chunksize]); + } + lock=new ReentrantLock(true); + } + + /** + * get a data packet for updating with new data + * + * @return <code>null</code> if flow window is full + */ + public DataPacket getForProducer(){ + lock.lock(); + try{ + if(isFull){ + return null; + } + if(isCheckout)throw new IllegalStateException(); + isCheckout=true; + DataPacket p=packets[writePos]; + return p; + }finally{ + lock.unlock(); + } + } + + public void produce(){ + lock.lock(); + try{ + isCheckout=false; + writePos++; + if(writePos==length)writePos=0; + validEntries++; + isFull=validEntries==length-1; + isEmpty=false; + produced++; + }finally{ + lock.unlock(); + } + } + + + public DataPacket consumeData(){ + if(isEmpty){ + return null; + } + lock.lock(); + try{ + readPos++; + DataPacket p=packets[readPos]; + if(readPos==length-1)readPos=-1; + validEntries--; + isEmpty=validEntries==0; + isFull=false; + consumed++; + return p; + }finally{ + lock.unlock(); + } + } + + boolean isEmpty(){ + return isEmpty; + } + + /** + * check if another entry can be added + * @return + */ + public boolean isFull(){ + return isFull; + } + + int readPos(){ + return readPos; + } + + int writePos(){ + return writePos; + } + + int consumed(){ + return consumed; + } + public String toString(){ + StringBuilder sb=new StringBuilder(); + sb.append("FlowWindow size=").append(length); + sb.append(" full=").append(isFull).append(" empty=").append(isEmpty); + sb.append(" consumed=").append(consumed).append(" produced=").append(produced); + return sb.toString(); + } +} Property changes on: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-02-17 21:24:32 UTC (rev 54) @@ -33,8 +33,6 @@ package udt.sender; import java.util.LinkedList; -import udt.util.MeanValue; - /** * stores the sequence number of the lost packets in increasing order */ @@ -57,14 +55,19 @@ backingList.add(i,obj); return; } - else if(obj==entry)return; + else if(obj.equals(entry))return; } backingList.add(obj); } } + public void remove(Long obj){ + synchronized (backingList) { + backingList.remove(obj); + } + } /** - * retrieves the loss list entry with the lowest sequence number + * retrieves the loss list entry with the lowest sequence number, or <code>null</code> if loss list is empty */ public Long getFirstEntry(){ synchronized(backingList){ Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-02-17 21:24:32 UTC (rev 54) @@ -109,7 +109,6 @@ while(true)Thread.sleep(10000); } - File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-02-17 21:24:32 UTC (rev 54) @@ -188,7 +188,7 @@ sb.append("Duplicate data packets: ").append(getNumberOfDuplicateDataPackets()).append("\n"); sb.append("ACK received: ").append(getNumberOfACKReceived()).append("\n"); sb.append("NAK received: ").append(getNumberOfNAKReceived()).append("\n"); - sb.append("Retransmitted data: ").append(getNumberOfNAKReceived()).append("\n"); + sb.append("Retransmitted data: ").append(getNumberOfRetransmittedDataPackets()).append("\n"); sb.append("NAK sent: ").append(getNumberOfNAKSent()).append("\n"); sb.append("ACK sent: ").append(getNumberOfACKSent()).append("\n"); if(roundTripTime>0){ Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-02-17 21:24:32 UTC (rev 54) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=300; + int num_packets=100; //how large is a single packet int size=1*1024*1024; @@ -55,7 +55,7 @@ new Random().nextBytes(data); MessageDigest digest=MessageDigest.getInstance("MD5"); - while(!serverRunning)Thread.sleep(100); + while(!serverStarted)Thread.sleep(100); long start=System.currentTimeMillis(); System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each"); long end=0; @@ -101,6 +101,7 @@ long total=0; volatile boolean serverRunning=true; + volatile boolean serverStarted=false; volatile String md5_received=null; @@ -110,6 +111,7 @@ Runnable serverProcess=new Runnable(){ public void run(){ + try{ Boolean devNull=Boolean.getBoolean("udt.dev.null"); if(devNull){ @@ -118,6 +120,7 @@ MessageDigest md5=MessageDigest.getInstance("MD5"); long start=System.currentTimeMillis(); UDTSocket s=serverSocket.accept(); + serverStarted=true; assertNotNull(s); UDTInputStream is=s.getInputStream(); byte[]buf=new byte[READ_BUFFERSIZE]; Added: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java (rev 0) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-02-17 21:24:32 UTC (rev 54) @@ -0,0 +1,151 @@ +package udt.sender; + +import java.util.concurrent.TimeoutException; + +import junit.framework.TestCase; +import udt.packets.DataPacket; + +public class TestFlowWindow extends TestCase { + + public void testFillWindow()throws InterruptedException, TimeoutException{ + FlowWindow fw=new FlowWindow(3, 128); + DataPacket p1=fw.getForProducer(); + assertNotNull(p1); + fw.produce(); + DataPacket p2=fw.getForProducer(); + assertNotNull(p2); + fw.produce(); + assertFalse(p1==p2); + DataPacket p3=fw.getForProducer(); + assertNotNull(p3); + assertFalse(p1==p3); + assertFalse(p2==p3); + fw.produce(); + assertTrue(fw.isFull()); + + DataPacket no=fw.getForProducer(); + assertNull("Window should be full",no); + + DataPacket c1=fw.consumeData(); + //must be p1 + assertTrue(c1==p1); + DataPacket c2=fw.consumeData(); + //must be p2 + assertTrue(c2==p2); + DataPacket c3=fw.consumeData(); + //must be p3 + assertTrue(c3==p3); + assertTrue(fw.isEmpty()); + } + + public void testOverflow()throws InterruptedException, TimeoutException{ + FlowWindow fw=new FlowWindow(3, 64); + DataPacket p1=fw.getForProducer(); + assertNotNull(p1); + fw.produce(); + DataPacket p2=fw.getForProducer(); + assertNotNull(p2); + fw.produce(); + assertFalse(p1==p2); + DataPacket p3=fw.getForProducer(); + assertNotNull(p3); + assertFalse(p1==p3); + assertFalse(p2==p3); + fw.produce(); + assertTrue(fw.isFull()); + + //read one + DataPacket c1=fw.consumeData(); + //must be p1 + assertTrue(c1==p1); + assertFalse(fw.isFull()); + + //now a slot for writing should be free again + DataPacket p4=fw.getForProducer(); + assertNotNull(p4); + fw.produce(); + //which is again p1 + assertTrue(p4==p1); + + } + + private volatile boolean fail=false; + + public void testConcurrentReadWrite()throws InterruptedException{ + final FlowWindow fw=new FlowWindow(20, 64); + Thread reader=new Thread(new Runnable(){ + public void run(){ + doRead(fw); + } + }); + reader.setName("reader"); + Thread writer=new Thread(new Runnable(){ + public void run(){ + doWrite(fw); + } + }); + writer.setName("writer"); + + writer.start(); + reader.start(); + + int c=0; + while(read && write && c<10){ + Thread.sleep(1000); + c++; + } + assertFalse("An error occured in reader or writer",fail); + + } + + volatile boolean read=true; + volatile boolean write=true; + int N=100000; + + private void doRead(final FlowWindow fw){ + System.out.println("Starting reader..."); + try{ + for(int i=0;i<N;i++){ + DataPacket p=null; + while( (p=fw.consumeData())==null){ + Thread.sleep(1); + } + synchronized (p) { + assertEquals(i,p.getMessageNumber()); + } + } + }catch(Throwable ex){ + ex.printStackTrace(); + fail=true; + } + System.out.println("Exiting reader..."); + read=false; + } + + private void doWrite(final FlowWindow fw){ + System.out.println("Starting writer..."); + DataPacket p=null; + try{ + for(int i=0;i<N;i++){ + p=null; + do{ + p=fw.getForProducer(); + if(p!=null){ + synchronized(p){ + p.setData(("test"+i).getBytes()); + p.setMessageNumber(i); + fw.produce(); + } + } + }while(p==null); + } + }catch(Exception ex){ + ex.printStackTrace(); + System.out.println("ERROR****"); + fail=true; + } + System.out.println("Exiting writer..."); + write=false; + } + +} Property changes on: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2011-01-06 16:13:38
|
Revision: 53 http://udt-java.svn.sourceforge.net/udt-java/?rev=53&view=rev Author: bschuller Date: 2011-01-06 16:13:32 +0000 (Thu, 06 Jan 2011) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTClient.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/MeanValue.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/performance/UDPTest.java Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53) @@ -117,12 +117,17 @@ if(localPort>0)this.port = localPort; else port=dgSocket.getLocalPort(); + configureSocket(); + } + + protected void configureSocket()throws SocketException{ //set a time out to avoid blocking in doReceive() dgSocket.setSoTimeout(100000); //buffer size dgSocket.setReceiveBufferSize(128*1024); + dgSocket.setReuseAddress(false); } - + /** * bind to the default network interface on the machine * @@ -237,8 +242,6 @@ private long lastDestID=-1; private UDTSession lastSession; - //MeanValue v=new MeanValue("receiver processing ",true, 256); - private int n=0; private final Object lock=new Object(); @@ -247,13 +250,10 @@ while(!stopped){ try{ try{ - //v.end(); //will block until a packet is received or timeout has expired dgSocket.receive(dp); - //v.begin(); - Destination peer=new Destination(dp.getAddress(), dp.getPort()); int l=dp.getLength(); UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); Modified: udt-java/trunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTClient.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDTClient.java 2011-01-06 16:13:32 UTC (rev 53) @@ -93,14 +93,19 @@ /** * sends the given data asynchronously * - * @param data + * @param data - the data to send * @throws IOException - * @throws InterruptedException */ - public void send(byte[]data)throws IOException, InterruptedException{ + public void send(byte[]data)throws IOException{ clientSession.getSocket().doWrite(data); } + /** + * sends the given data and waits for acknowledgement + * @param data - the data to send + * @throws IOException + * @throws InterruptedException if interrupted while waiting for ack + */ public void sendBlocking(byte[]data)throws IOException, InterruptedException{ clientSession.getSocket().doWriteBlocking(data); } Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53) @@ -183,13 +183,13 @@ private MeanValue dataProcessTime; private void initMetrics(){ if(!storeStatistics)return; - dgReceiveInterval=new MeanValue("UDT receive interval"); + dgReceiveInterval=new MeanValue("RECEIVER: UDT receive interval"); statistics.addMetric(dgReceiveInterval); - dataPacketInterval=new MeanValue("Data packet interval"); + dataPacketInterval=new MeanValue("RECEIVER: Data packet interval"); statistics.addMetric(dataPacketInterval); - processTime=new MeanValue("UDT packet process time"); + processTime=new MeanValue("RECEIVER: UDT packet process time"); statistics.addMetric(processTime); - dataProcessTime=new MeanValue("Data packet process time"); + dataProcessTime=new MeanValue("RECEIVER: Data packet process time"); statistics.addMetric(dataProcessTime); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53) @@ -125,7 +125,7 @@ statistics=session.getStatistics(); senderLossList=new SenderLossList(); sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2); - sendQueue = new ArrayBlockingQueue<DataPacket>(1000); + sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true); lastAckSequenceNumber=session.getInitialSequenceNumber(); currentSequenceNumber=session.getInitialSequenceNumber()-1; waitForAckLatch.set(new CountDownLatch(1)); @@ -140,11 +140,11 @@ private MeanThroughput throughput; private void initMetrics(){ if(!storeStatistics)return; - dgSendTime=new MeanValue("Datagram send time"); + dgSendTime=new MeanValue("SENDER: Datagram send time"); statistics.addMetric(dgSendTime); - dgSendInterval=new MeanValue("Datagram send interval"); + dgSendInterval=new MeanValue("SENDER: Datagram send interval"); statistics.addMetric(dgSendInterval); - throughput=new MeanThroughput("Throughput", session.getDatagramSize()); + throughput=new MeanThroughput("SENDER: Throughput", session.getDatagramSize()); statistics.addMetric(throughput); } @@ -338,7 +338,7 @@ if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() && unAcknowledged<session.getFlowWindowSize()){ //check for application data - DataPacket dp=sendQueue.poll(Util.SYN,TimeUnit.MICROSECONDS); + DataPacket dp=sendQueue.poll(); if(dp!=null){ send(dp); largestSentSequenceNumber=dp.getPacketSequenceNumber(); Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53) @@ -33,6 +33,8 @@ package udt.sender; import java.util.LinkedList; +import udt.util.MeanValue; + /** * stores the sequence number of the lost packets in increasing order */ @@ -49,15 +51,15 @@ public void insert(Long obj){ synchronized (backingList) { - if(!backingList.contains(obj)){ - for(int i=0;i<backingList.size();i++){ - if(obj<backingList.get(i)){ - backingList.add(i,obj); - return; - } + for(int i=0;i<backingList.size();i++){ + Long entry=backingList.get(i); + if(obj<entry){ + backingList.add(i,obj); + return; } - backingList.add(obj); + else if(obj==entry)return; } + backingList.add(obj); } } @@ -69,7 +71,7 @@ return backingList.poll(); } } - + public boolean isEmpty(){ return backingList.isEmpty(); } Modified: udt-java/trunk/src/main/java/udt/util/MeanValue.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/MeanValue.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/util/MeanValue.java 2011-01-06 16:13:32 UTC (rev 53) @@ -9,6 +9,8 @@ public class MeanValue { private double mean=0; + private double max=0; + private double min=0; private int n=0; @@ -43,9 +45,15 @@ public void addValue(double value){ mean=(mean*n+value)/(n+1); n++; + max=Math.max(max, value); + min=Math.min(max, value); + if(verbose && n % nValue == 0){ - if(msg!=null)System.out.println(msg+" "+getFormattedMean()); - else System.out.println(name+getFormattedMean()); + if(msg!=null)System.out.println(msg+" "+get()); + else System.out.println(name+" "+get()); + + max=0; + min=0; } } @@ -57,6 +65,10 @@ return format.format(getMean()); } + public String get(){ + return format.format(getMean())+" max="+format.format(max)+" min="+format.format(min); + } + public void clear(){ mean=0; n=0; Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2011-01-06 16:13:32 UTC (rev 53) @@ -44,7 +44,6 @@ lock=new ReentrantLock(false); notEmpty=lock.newCondition(); highestReadSequenceNumber=SequenceNumber.decrement(initialSequenceNumber); - System.out.println("SIZE: "+size); } public boolean offer(AppData data){ @@ -121,14 +120,6 @@ } else return null; } - // else{ - // System.out.println("empty HEAD at pos="+readPosition); - // try{ - // Thread.sleep(1000); - // Thread.yield(); - // }catch(InterruptedException e){}; - // } - return r; } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53) @@ -104,6 +104,12 @@ } long size=decode(sizeInfo, 0); + Boolean devNull=Boolean.getBoolean("udt.dev.null"); + if(devNull){ + while(true)Thread.sleep(10000); + } + + File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=200; + int num_packets=300; //how large is a single packet int size=1*1024*1024; @@ -32,6 +32,8 @@ public void test1()throws Exception{ Logger.getLogger("udt").setLevel(Level.INFO); +// System.setProperty("udt.receiver.storeStatistics","true"); +// System.setProperty("udt.sender.storeStatistics","true"); UDTReceiver.dropRate=0; TIMEOUT=Integer.MAX_VALUE; doTest(); @@ -109,6 +111,10 @@ Runnable serverProcess=new Runnable(){ public void run(){ try{ + Boolean devNull=Boolean.getBoolean("udt.dev.null"); + if(devNull){ + while(true)Thread.sleep(10000); + } MessageDigest md5=MessageDigest.getInstance("MD5"); long start=System.currentTimeMillis(); UDTSocket s=serverSocket.accept(); Modified: udt-java/trunk/src/test/java/udt/performance/UDPTest.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2010-11-11 21:56:26 UTC (rev 52) +++ udt-java/trunk/src/test/java/udt/performance/UDPTest.java 2011-01-06 16:13:32 UTC (rev 53) @@ -23,8 +23,10 @@ public void test1()throws Exception{ runServer(); runThirdThread(); + //client socket DatagramSocket s=new DatagramSocket(12345); + //generate a test array with random content N=num_packets*packetSize; byte[]data=new byte[packetSize]; @@ -34,32 +36,29 @@ dp.setAddress(InetAddress.getByName("localhost")); dp.setPort(65321); System.out.println("Sending "+num_packets+" data blocks of <"+packetSize+"> bytes"); - MeanValue v=new MeanValue("Datagram send time",false); - MeanValue v2=new MeanValue("Datagram send interval",false); - MeanValue v3=new MeanValue("Encoding time",false); + MeanValue dgSendTime=new MeanValue("Datagram send time",false); + MeanValue dgSendInterval=new MeanValue("Datagram send interval",false); for(int i=0;i<num_packets;i++){ DataPacket p=new DataPacket(); p.setData(data); - v3.begin(); dp.setData(p.getEncoded()); - v3.end(); - v2.end(); - v.begin(); + dgSendInterval.end(); + dgSendTime.begin(); s.send(dp); - v.end(); - v2.begin(); + dgSendTime.end(); + dgSendInterval.begin(); } System.out.println("Finished sending."); while(serverRunning)Thread.sleep(10); System.out.println("Server stopped."); long end=System.currentTimeMillis(); System.out.println("Done. Sending "+N/1024/1024+" Mbytes took "+(end-start)+" ms"); - System.out.println("Rate "+N/1000/(end-start)+" Mbytes/sec"); + float rate=N/1000/(end-start); + System.out.println("Rate "+rate+" Mbytes/sec "+(rate*8)+ " Mbit/sec"); System.out.println("Rate "+num_packets+" packets/sec"); - System.out.println("Mean send time "+v.getFormattedMean()+" microsec"); - System.out.println("Mean send interval "+v2.getFormattedMean()+" microsec"); - System.out.println("Datapacket encoding time "+v3.getFormattedMean()+" microsec"); + System.out.println("Mean send time "+dgSendTime.get()); + System.out.println("Mean send interval "+dgSendInterval.get()); System.out.println("Server received: "+total); } @@ -79,6 +78,7 @@ while(true){ serverSocket.receive(dp); handoff.offer(dp); + total+=dp.getLength(); } } catch(Exception e){ @@ -117,5 +117,5 @@ t.start(); } - + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-11-11 23:22:55
|
Revision: 52 http://udt-java.svn.sourceforge.net/udt-java/?rev=52&view=rev Author: bschuller Date: 2010-11-11 21:56:26 +0000 (Thu, 11 Nov 2010) Log Message: ----------- fix sender loss list (entries were not ordered) Modified Paths: -------------- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/test/java/udt/TestList.java Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-09-23 09:09:56 UTC (rev 51) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2010-11-11 21:56:26 UTC (rev 52) @@ -51,7 +51,7 @@ synchronized (backingList) { if(!backingList.contains(obj)){ for(int i=0;i<backingList.size();i++){ - if(obj<backingList.getFirst()){ + if(obj<backingList.get(i)){ backingList.add(i,obj); return; } Modified: udt-java/trunk/src/test/java/udt/TestList.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestList.java 2010-09-23 09:09:56 UTC (rev 51) +++ udt-java/trunk/src/test/java/udt/TestList.java 2010-11-11 21:56:26 UTC (rev 52) @@ -88,6 +88,10 @@ assertEquals(3,l.size()); Long oldest=l.getFirstEntry(); assertEquals(C,oldest); + oldest=l.getFirstEntry(); + assertEquals(A,oldest); + oldest=l.getFirstEntry(); + assertEquals(B,oldest); } public void testReceiverInputQueue(){ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-09-23 09:10:02
|
Revision: 51 http://udt-java.svn.sourceforge.net/udt-java/?rev=51&view=rev Author: bschuller Date: 2010-09-23 09:09:56 +0000 (Thu, 23 Sep 2010) Log Message: ----------- update version number Modified Paths: -------------- udt-java/trunk/pom.xml Modified: udt-java/trunk/pom.xml =================================================================== --- udt-java/trunk/pom.xml 2010-09-17 11:04:39 UTC (rev 50) +++ udt-java/trunk/pom.xml 2010-09-23 09:09:56 UTC (rev 51) @@ -5,7 +5,7 @@ <artifactId>udt-java</artifactId> <packaging>jar</packaging> <name>UDT Java implementation</name> - <version>0.5-SNAPSHOT</version> + <version>0.6-SNAPSHOT</version> <url>http://sourceforge.net/projects/udt-java</url> <developers> <developer> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bsc...@us...> - 2010-09-17 11:04:46
|
Revision: 50 http://udt-java.svn.sourceforge.net/udt-java/?rev=50&view=rev Author: bschuller Date: 2010-09-17 11:04:39 +0000 (Fri, 17 Sep 2010) Log Message: ----------- Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/Util.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-09-17 11:04:39 UTC (rev 50) @@ -386,14 +386,14 @@ protected void onDataPacketReceived(DataPacket dp)throws IOException{ long currentSequenceNumber = dp.getPacketSequenceNumber(); - //check whether to drop this packet + //for TESTING : check whether to drop this packet // n++; // //if(dropRate>0 && n % dropRate == 0){ -// if(n==666){ -// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); -// return; -// } -// +// if(n % 1111 == 0){ +// logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING"); +// return; +// } +// //} boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); if(!OK){ //need to drop packet... Modified: udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/util/ReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50) @@ -61,6 +61,7 @@ int insert=offset% size; buffer[insert]=data; numValidChunks.incrementAndGet(); + notEmpty.signal(); return true; }finally{ lock.unlock(); Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-09-17 11:04:39 UTC (rev 50) @@ -32,8 +32,10 @@ package udt.util; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.text.NumberFormat; @@ -101,22 +103,16 @@ total+=r; } long size=decode(sizeInfo, 0); - if(verbose){ - StringBuilder sb=new StringBuilder(); - for(int i=0;i<sizeInfo.length;i++){ - sb.append(Integer.toString(sizeInfo[i])); - sb.append(" "); - } - System.out.println("[ReceiveFile] Size info: "+sb.toString()); - } + File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); + OutputStream os=new BufferedOutputStream(fos,1024*1024); try{ System.out.println("[ReceiveFile] Reading <"+size+"> bytes."); long start = System.currentTimeMillis(); //and read the file data - Util.copy(in, fos, size, false); + Util.copy(in, os, size, false); long end = System.currentTimeMillis(); double rate=1000.0*size/1024/1024/(end-start); System.out.println("[ReceiveFile] Rate: "+format.format(rate)+" MBytes/sec. " Modified: udt-java/trunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/main/java/udt/util/Util.java 2010-09-17 11:04:39 UTC (rev 50) @@ -134,6 +134,7 @@ c=source.read(buf); if(c<0)break; read+=c; + //System.out.println("writing <"+c+"> bytes"); target.write(buf, 0, c); if(flush)target.flush(); if(read>=size && size>-1)break; Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2010-09-17 11:04:39 UTC (rev 50) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=500; + int num_packets=200; //how large is a single packet int size=1*1024*1024; Modified: udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java =================================================================== --- udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-15 19:22:18 UTC (rev 49) +++ udt-java/trunk/src/test/java/udt/util/TestReceiveBuffer.java 2010-09-17 11:04:39 UTC (rev 50) @@ -16,79 +16,79 @@ byte[]test1="test1".getBytes(); byte[]test2="test2".getBytes(); byte[]test3="test3".getBytes(); - + b.offer(new AppData(1l,test1)); b.offer(new AppData(2l,test2)); b.offer(new AppData(3l,test3)); - + AppData a=b.poll(); assertEquals(1l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(2l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(3l,a.getSequenceNumber()); - + assertNull(b.poll()); } - + public void testOutOfOrder(){ ReceiveBuffer b=new ReceiveBuffer(16,1); byte[]test1="test1".getBytes(); byte[]test2="test2".getBytes(); byte[]test3="test3".getBytes(); - + b.offer(new AppData(3l,test3)); b.offer(new AppData(2l,test2)); b.offer(new AppData(1l,test1)); - + AppData a=b.poll(); assertEquals(1l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(2l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(3l,a.getSequenceNumber()); - + assertNull(b.poll()); } - + public void testInterleaved(){ ReceiveBuffer b=new ReceiveBuffer(16,1); byte[]test1="test1".getBytes(); byte[]test2="test2".getBytes(); byte[]test3="test3".getBytes(); - + b.offer(new AppData(3l,test3)); - + b.offer(new AppData(1l,test1)); - + AppData a=b.poll(); assertEquals(1l,a.getSequenceNumber()); - + assertNull(b.poll()); - + b.offer(new AppData(2l,test2)); - + a=b.poll(); assertEquals(2l,a.getSequenceNumber()); - + a=b.poll(); assertEquals(3l,a.getSequenceNumber()); } - + public void testOverflow(){ ReceiveBuffer b=new ReceiveBuffer(4,1); - + for(int i=0; i<3; i++){ b.offer(new AppData(i+1,"test".getBytes())); } for(int i=0; i<3; i++){ assertEquals(i+1, b.poll().getSequenceNumber()); } - + for(int i=0; i<3; i++){ b.offer(new AppData(i+4,"test".getBytes())); } @@ -96,13 +96,13 @@ assertEquals(i+4, b.poll().getSequenceNumber()); } } - - + + public void testTimedPoll()throws Exception{ final ReceiveBuffer b=new ReceiveBuffer(4,1); - + Runnable write=new Runnable(){ - + public void run(){ try{ for(int i=0; i<5; i++){ @@ -115,7 +115,7 @@ } } }; - + Callable<String> reader=new Callable<String>(){ public String call() throws Exception { for(int i=0; i<5; i++){ @@ -131,12 +131,60 @@ return "OK."; } }; + + ScheduledExecutorService es=Executors.newScheduledThreadPool(2); + es.execute(write); + Future<String>res=es.submit(reader); + res.get(); + es.shutdownNow(); + } + + + volatile boolean poll=false; + + public void testTimedPoll2()throws Exception{ + final ReceiveBuffer b=new ReceiveBuffer(4,1); + Runnable write=new Runnable(){ + + public void run(){ + try{ + Thread.sleep(2979); + System.out.println("PUT"); + while(!poll)Thread.sleep(10); + b.offer(new AppData(1,"test".getBytes())); + System.out.println("... PUT OK"); + } + catch(Exception e){ + e.printStackTrace(); + fail(); + } + } + }; + + Callable<String> reader=new Callable<String>(){ + public String call() throws Exception { + AppData r=null; + do{ + try{ + poll=true; + System.out.println("POLL"); + r=b.poll(1000, TimeUnit.MILLISECONDS); + poll=false; + if(r!=null)System.out.println("... POLL OK"); + else System.out.println("... nothing."); + }catch(InterruptedException ie){ + ie.printStackTrace(); + } + }while(r==null); + return "OK."; + } + }; + ScheduledExecutorService es=Executors.newScheduledThreadPool(2); es.execute(write); Future<String>res=es.submit(reader); res.get(); es.shutdownNow(); } - } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |