[Udt-java-commits] SF.net SVN: udt-java:[61] udt-java/skunk/src/main/java/udt
Status: Alpha
Brought to you by:
bschuller
From: <pe...@us...> - 2011-08-05 06:54:32
|
Revision: 61 http://udt-java.svn.sourceforge.net/udt-java/?rev=61&view=rev Author: pete_ Date: 2011-08-05 06:54:24 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Renamed UDPEndPoint to UDPMultiplexer, this will make the implementation easier to understand for new comers who've read UDTv4: Improvements in Performance and Usability. Modified Paths: -------------- udt-java/skunk/src/main/java/udt/ClientSession.java udt-java/skunk/src/main/java/udt/ServerSession.java udt-java/skunk/src/main/java/udt/UDTClient.java udt-java/skunk/src/main/java/udt/UDTCongestionControl.java udt-java/skunk/src/main/java/udt/UDTReceiver.java udt-java/skunk/src/main/java/udt/UDTSender.java udt-java/skunk/src/main/java/udt/UDTServerSocket.java udt-java/skunk/src/main/java/udt/UDTSession.java udt-java/skunk/src/main/java/udt/UDTSocket.java udt-java/skunk/src/main/java/udt/util/Util.java Added Paths: ----------- udt-java/skunk/src/main/java/udt/UDPMultiplexer.java Removed Paths: ------------- udt-java/skunk/src/main/java/udt/UDPEndPoint.java Modified: udt-java/skunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 06:54:24 UTC (rev 61) @@ -50,9 +50,9 @@ private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); - private UDPEndPoint endPoint; + private UDPMultiplexer endPoint; - public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{ + public ClientSession(UDPMultiplexer endPoint, UDTSocketAddress dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; logger.info("Created "+toString()); Modified: udt-java/skunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 06:54:24 UTC (rev 61) @@ -51,12 +51,12 @@ private static final Logger logger=Logger.getLogger(ServerSession.class.getName()); - private final UDPEndPoint endPoint; + private final UDPMultiplexer endPoint; //last received packet (for testing purposes) private UDTPacket lastPacket; - public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ + public ServerSession(DatagramPacket dp, UDPMultiplexer endPoint)throws SocketException,UnknownHostException{ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0)); this.endPoint=endPoint; logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); Deleted: udt-java/skunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 06:54:24 UTC (rev 61) @@ -1,540 +0,0 @@ -/********************************************************************************* - * Copyright (c) 2010 Forschungszentrum Juelich GmbH - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * (1) Redistributions of source code must retain the above copyright notice, - * this list of conditions and the disclaimer at the end. Redistributions in - * binary form must reproduce the above copyright notice, this list of - * conditions and the following disclaimer in the documentation and/or other - * materials provided with the distribution. - * - * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its - * contributors may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * DISCLAIMER - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - *********************************************************************************/ - -package udt; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -import udt.packets.ConnectionHandshake; -import udt.packets.UDTSocketAddress; -import udt.packets.PacketFactory; -import udt.util.ObjectPool; -import udt.util.UDTThreadFactory; - -/** - * the UDPEndpoint takes care of sending and receiving UDP network packets, - * dispatching them to the correct {@link UDTSession} - */ -public class UDPEndPoint { - - //class fields - private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); - public static final int DATAGRAM_SIZE=1400; - - - //class methods - private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints - = new WeakHashMap<SocketAddress, UDPEndPoint>(); - - public static UDPEndPoint get(DatagramSocket socket){ - SocketAddress localInetSocketAddress = null; - UDPEndPoint result = null; - if ( socket.isBound()){ - SocketAddress sa = socket.getLocalSocketAddress(); - if ( sa instanceof InetSocketAddress ){ - localInetSocketAddress = (InetSocketAddress) sa; - } else { - // Must be a special DatagramSocket impl or extended. - localInetSocketAddress = - new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); - } - synchronized (localEndpoints){ - result = localEndpoints.get(localInetSocketAddress); - } - } - if (result != null) return result; - try { - result = new UDPEndPoint(socket); - if (localInetSocketAddress == null){ - // The DatagramSocket was unbound, it should be bound now. - localInetSocketAddress = - new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); - } - } catch (SocketException ex) { - Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex); - } - if (result != null){ - synchronized (localEndpoints){ - UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); - if (exists != null && exists.getSocket().equals(socket)) result = exists; - // Only cache if a record doesn't already exist. - else if (exists == null) localEndpoints.put(localInetSocketAddress, result); - } - } - return result; // may be null. - } - - - - public static UDPEndPoint get(InetAddress localAddress, int localPort){ - InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort); - return get(localInetSocketAddress); - } - - public static UDPEndPoint get(SocketAddress localSocketAddress){ - InetSocketAddress localInetSocketAddress = null; - if (localSocketAddress instanceof InetSocketAddress){ - localInetSocketAddress = (InetSocketAddress) localSocketAddress; - } else if (localSocketAddress instanceof UDTSocketAddress){ - UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress; - localInetSocketAddress = - new InetSocketAddress(udtSA.getAddress(), udtSA.getPort()); - } - if (localInetSocketAddress == null) return null; - UDPEndPoint result = null; - synchronized (localEndpoints){ - result = localEndpoints.get(localInetSocketAddress); - } - if (result != null) return result; - try { - result = new UDPEndPoint(localInetSocketAddress); - if (localInetSocketAddress.getPort() == 0 || - localInetSocketAddress.getAddress().isAnyLocalAddress()){ - // ephemeral port or wildcard address, bind operation is complete. - localInetSocketAddress = - new InetSocketAddress(result.getLocalAddress(), result.getLocalPort()); - } - } catch (SocketException ex) { - logger.log(Level.SEVERE, null, ex); - } catch (UnknownHostException ex) { - logger.log(Level.SEVERE, null, ex); - } - if (result != null){ - synchronized (localEndpoints){ - UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); - if (exists != null) result = exists; - else localEndpoints.put(localInetSocketAddress, result); - } - } - return result; // may be null. - } - - /** - * Allows a custom endpoint to be added to the pool. - * @param endpoint - */ - public static void put(UDPEndPoint endpoint){ - SocketAddress local = endpoint.getSocket().getLocalSocketAddress(); - synchronized (localEndpoints){ - localEndpoints.put(local, endpoint); - } - } - - //object fields - private final int port; - - private final DatagramSocket dgSocket; - - //active sessions keyed by socket ID - private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>(); - - //last received packet - private UDTPacket lastPacket; - - //if the endpoint is configured for a server socket, - //this queue is used to handoff new UDTSessions to the application - private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); - - private final ObjectPool<BlockingQueue<UDTSession>> queuePool - = new ObjectPool<BlockingQueue<UDTSession>>(20); - - private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff - = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120); - - // registered sockets - private final Set<Integer> registeredSockets = new HashSet<Integer>(120); - // registered sockets lock - private final ReadWriteLock rwl = new ReentrantReadWriteLock(); - private final Lock readSocketLock = rwl.readLock(); - private final Lock writeSocketLock = rwl.writeLock(); - - - private boolean serverSocketMode=false; - - //has the endpoint been stopped? - private volatile boolean stopped=false; - - private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); - - - /** - * create an endpoint on the given socket - * - * @param socket - a UDP datagram socket - * @throws SocketException - */ - protected UDPEndPoint(DatagramSocket socket) throws SocketException{ - this.dgSocket=socket; - if (!socket.isBound()){ - socket.bind(null); - } - port=dgSocket.getLocalPort(); - } - - /** - * bind to any local port on the given host address - * @param localAddress - * @throws SocketException - * @throws UnknownHostException - */ - private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{ - this(localAddress,0); - } - - /** - * Bind to the given address and port - * @param localAddress - * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. - * @throws SocketException - * @throws UnknownHostException - */ - private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ - if(localAddress==null){ - dgSocket=new DatagramSocket(localPort, localAddress); - }else{ - dgSocket=new DatagramSocket(localPort); - } - if(localPort>0)this.port = localPort; - else port=dgSocket.getLocalPort(); - - configureSocket(); - } - - private UDPEndPoint (InetSocketAddress localSocketAddress) - throws SocketException, UnknownHostException { - dgSocket = new DatagramSocket(localSocketAddress); - port = dgSocket.getLocalPort(); - configureSocket(); - } - - protected void configureSocket()throws SocketException{ - //set a time out to avoid blocking in doReceive() - dgSocket.setSoTimeout(100000); - //buffer size - dgSocket.setReceiveBufferSize(128*1024); - dgSocket.setReuseAddress(false); - } - - /** - * bind to the default network interface on the machine - * - * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. - * @throws SocketException - * @throws UnknownHostException - */ - public UDPEndPoint(int localPort)throws SocketException, UnknownHostException{ - this(null,localPort); - } - - /** - * bind to an ephemeral port on the default network interface on the machine - * - * @throws SocketException - * @throws UnknownHostException - */ - public UDPEndPoint()throws SocketException, UnknownHostException{ - this(null,0); - } - - /** - * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>, - * a new connection can be handed off to an application. The application needs to - * call #accept() to get the socket - * @param serverSocketModeEnabled - */ - public void start(boolean serverSocketModeEnabled){ - serverSocketMode=serverSocketModeEnabled; - //start receive thread - Runnable receive=new Runnable(){ - public void run(){ - try{ - doReceive(); - }catch(Exception ex){ - logger.log(Level.WARNING,"",ex); - } - } - }; - Thread t=UDTThreadFactory.get().newThread(receive); - t.setName("UDPEndpoint-"+t.getName()); - t.setDaemon(true); - t.start(); - logger.info("UDTEndpoint started."); - } - - public void start(){ - start(false); - } - - public void stop(){ - stopped=true; - dgSocket.close(); - } - - /** - * Provides assistance to a socket to determine a random socket id, - * every caller receives a unique value. This value is unique at the - * time of calling, however it may not be at registration time. - * - * This socketID has not been registered, all socket ID's must be - * registered or connection will fail. - * @return - */ - public int getUniqueSocketID(){ - Integer socketID = nextSocketID.getAndIncrement(); - try{ - readSocketLock.lock(); - while (registeredSockets.contains(socketID)){ - socketID = nextSocketID.getAndIncrement(); - } - return socketID; // should we register it? - } finally { - readSocketLock.unlock(); - } - } - - void registerSocketID(int socketID, UDTSocket socket) throws SocketException { - if (!equals(socket.getEndpoint())) throw new SocketException ( - "Socket doesn't originate for this endpoint: " - + socket.toString()); - try { - writeSocketLock.lock(); - if (registeredSockets.contains(socketID)){ - throw new SocketException("Already registered, Socket ID: " +socketID); - } - registeredSockets.add(socketID); - }finally{ - writeSocketLock.unlock(); - } - } - - /** - * @return the port which this client is bound to - */ - public int getLocalPort() { - return this.dgSocket.getLocalPort(); - } - /** - * @return Gets the local address to which the socket is bound - */ - public InetAddress getLocalAddress(){ - return this.dgSocket.getLocalAddress(); - } - - DatagramSocket getSocket(){ - return dgSocket; - } - - UDTPacket getLastPacket(){ - return lastPacket; - } - - public void addSession(Integer destinationID,UDTSession session){ - logger.log(Level.INFO, "Storing session <{0}>", destinationID); - sessions.put(destinationID, session); - } - - public UDTSession getSession(Long destinationID){ - return sessions.get(destinationID); - } - - public Collection<UDTSession> getSessions(){ - return sessions.values(); - } - - /** - * wait the given time for a new connection - * @param timeout - the time to wait - * @param unit - the {@link TimeUnit} - * @param socketID - the socket id. - * @return a new {@link UDTSession} - * @throws InterruptedException - */ - protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{ - //return sessionHandoff.poll(timeout, unit); - BlockingQueue<UDTSession> session = handoff.get(socketID); - try { - if (session == null){ - session = queuePool.get(); - if (session == null) { - session = new ArrayBlockingQueue<UDTSession>(1); - } - BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session); - if (existed != null){ - session = existed; - } - } - return session.poll(timeout, unit); - } finally { - boolean removed = handoff.remove(socketID, session); - if (removed){ - session.clear(); - queuePool.accept(session); - } - } - } - - - final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE); - - /** - * single receive, run in the receiverThread, see {@link #start()} - * <ul> - * <li>Receives UDP packets from the network</li> - * <li>Converts them to UDT packets</li> - * <li>dispatches the UDT packets according to their destination ID.</li> - * </ul> - * @throws IOException - */ - private long lastDestID=-1; - private UDTSession lastSession; - - private int n=0; - - private final Object lock=new Object(); - - protected void doReceive()throws IOException{ - while(!stopped){ - try{ - //will block until a packet is received or timeout has expired - dgSocket.receive(dp); - UDTSocketAddress peer= null; - int l=dp.getLength(); - UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); - lastPacket=packet; - //handle connection handshake - if(packet.isConnectionHandshake()){ - synchronized(lock){ - Long id=Long.valueOf(packet.getDestinationID()); - UDTSession session=sessions.get(id); - if(session==null){ // What about DOS? - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); -// sessionHandoff.put(session); // blocking method, what about offer? - BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID()); - if (queue != null){ - boolean success = queue.offer(session); - if (success){ - logger.fine("Request taken for processing."); - } else { - logger.fine("Request discarded, queue full."); - } - } else { - logger.fine("No ServerSocket listening at socketID: " - + session.getSocketID() - + "to answer request"); - } - } - } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(), - ((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); - } - } - else{ - //dispatch to existing session - long dest=packet.getDestinationID(); - UDTSession session; - if(dest==lastDestID){ - session=lastSession; - } - else{ - session=sessions.get(dest); - lastSession=session; - lastDestID=dest; - } - if(session==null){ - n++; - if(n%100==1){ - logger.warning("Unknown session <"+dest - +"> requested from <"+peer+"> packet type " - +packet.getClass().getName()); - } - } - else{ - session.received(packet,peer); - } - } - }catch(SocketException ex){ - logger.log(Level.INFO, "SocketException: "+ex.getMessage()); - }catch(SocketTimeoutException ste){ - //can safely ignore... we will retry until the endpoint is stopped - }catch(Exception ex){ - logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); - } - } - } - - protected void doSend(UDTPacket packet)throws IOException{ - byte[]data=packet.getEncoded(); - DatagramPacket dgp = packet.getSession().getDatagram(); - dgp.setData(data); - dgSocket.send(dgp); - } - - public String toString(){ - return "UDPEndpoint port="+port; - } - - public void sendRaw(DatagramPacket p)throws IOException{ - dgSocket.send(p); - } -} Copied: udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (from rev 60, udt-java/skunk/src/main/java/udt/UDPEndPoint.java) =================================================================== --- udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (rev 0) +++ udt-java/skunk/src/main/java/udt/UDPMultiplexer.java 2011-08-05 06:54:24 UTC (rev 61) @@ -0,0 +1,540 @@ +/********************************************************************************* + * Copyright (c) 2010 Forschungszentrum Juelich GmbH + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * (1) Redistributions of source code must retain the above copyright notice, + * this list of conditions and the disclaimer at the end. Redistributions in + * binary form must reproduce the above copyright notice, this list of + * conditions and the following disclaimer in the documentation and/or other + * materials provided with the distribution. + * + * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * DISCLAIMER + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + *********************************************************************************/ + +package udt; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +import udt.packets.ConnectionHandshake; +import udt.packets.UDTSocketAddress; +import udt.packets.PacketFactory; +import udt.util.ObjectPool; +import udt.util.UDTThreadFactory; + +/** + * the UDPMultiplexer takes care of sending and receiving UDP network packets, + * dispatching them to the correct {@link UDTSession} + */ +public class UDPMultiplexer { + + //class fields + private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); + public static final int DATAGRAM_SIZE=1400; + + + //class methods + private static final WeakHashMap<SocketAddress, UDPMultiplexer> localEndpoints + = new WeakHashMap<SocketAddress, UDPMultiplexer>(); + + public static UDPMultiplexer get(DatagramSocket socket){ + SocketAddress localInetSocketAddress = null; + UDPMultiplexer result = null; + if ( socket.isBound()){ + SocketAddress sa = socket.getLocalSocketAddress(); + if ( sa instanceof InetSocketAddress ){ + localInetSocketAddress = (InetSocketAddress) sa; + } else { + // Must be a special DatagramSocket impl or extended. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + } + if (result != null) return result; + try { + result = new UDPMultiplexer(socket); + if (localInetSocketAddress == null){ + // The DatagramSocket was unbound, it should be bound now. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + } catch (SocketException ex) { + Logger.getLogger(UDPMultiplexer.class.getName()).log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress); + if (exists != null && exists.getSocket().equals(socket)) result = exists; + // Only cache if a record doesn't already exist. + else if (exists == null) localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + + + public static UDPMultiplexer get(InetAddress localAddress, int localPort){ + InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort); + return get(localInetSocketAddress); + } + + public static UDPMultiplexer get(SocketAddress localSocketAddress){ + InetSocketAddress localInetSocketAddress = null; + if (localSocketAddress instanceof InetSocketAddress){ + localInetSocketAddress = (InetSocketAddress) localSocketAddress; + } else if (localSocketAddress instanceof UDTSocketAddress){ + UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress; + localInetSocketAddress = + new InetSocketAddress(udtSA.getAddress(), udtSA.getPort()); + } + if (localInetSocketAddress == null) return null; + UDPMultiplexer result = null; + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + if (result != null) return result; + try { + result = new UDPMultiplexer(localInetSocketAddress); + if (localInetSocketAddress.getPort() == 0 || + localInetSocketAddress.getAddress().isAnyLocalAddress()){ + // ephemeral port or wildcard address, bind operation is complete. + localInetSocketAddress = + new InetSocketAddress(result.getLocalAddress(), result.getLocalPort()); + } + } catch (SocketException ex) { + logger.log(Level.SEVERE, null, ex); + } catch (UnknownHostException ex) { + logger.log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress); + if (exists != null) result = exists; + else localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + /** + * Allows a custom endpoint to be added to the pool. + * @param endpoint + */ + public static void put(UDPMultiplexer endpoint){ + SocketAddress local = endpoint.getSocket().getLocalSocketAddress(); + synchronized (localEndpoints){ + localEndpoints.put(local, endpoint); + } + } + + //object fields + private final int port; + + private final DatagramSocket dgSocket; + + //active sessions keyed by socket ID + private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>(); + + //last received packet + private UDTPacket lastPacket; + + //if the endpoint is configured for a server socket, + //this queue is used to handoff new UDTSessions to the application + private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); + + private final ObjectPool<BlockingQueue<UDTSession>> queuePool + = new ObjectPool<BlockingQueue<UDTSession>>(20); + + private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff + = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120); + + // registered sockets + private final Set<Integer> registeredSockets = new HashSet<Integer>(120); + // registered sockets lock + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock readSocketLock = rwl.readLock(); + private final Lock writeSocketLock = rwl.writeLock(); + + + private boolean serverSocketMode=false; + + //has the endpoint been stopped? + private volatile boolean stopped=false; + + private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); + + + /** + * create an endpoint on the given socket + * + * @param socket - a UDP datagram socket + * @throws SocketException + */ + protected UDPMultiplexer(DatagramSocket socket) throws SocketException{ + this.dgSocket=socket; + if (!socket.isBound()){ + socket.bind(null); + } + port=dgSocket.getLocalPort(); + } + + /** + * bind to any local port on the given host address + * @param localAddress + * @throws SocketException + * @throws UnknownHostException + */ + private UDPMultiplexer(InetAddress localAddress)throws SocketException, UnknownHostException{ + this(localAddress,0); + } + + /** + * Bind to the given address and port + * @param localAddress + * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. + * @throws SocketException + * @throws UnknownHostException + */ + private UDPMultiplexer(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ + if(localAddress==null){ + dgSocket=new DatagramSocket(localPort, localAddress); + }else{ + dgSocket=new DatagramSocket(localPort); + } + if(localPort>0)this.port = localPort; + else port=dgSocket.getLocalPort(); + + configureSocket(); + } + + private UDPMultiplexer (InetSocketAddress localSocketAddress) + throws SocketException, UnknownHostException { + dgSocket = new DatagramSocket(localSocketAddress); + port = dgSocket.getLocalPort(); + configureSocket(); + } + + protected void configureSocket()throws SocketException{ + //set a time out to avoid blocking in doReceive() + dgSocket.setSoTimeout(100000); + //buffer size + dgSocket.setReceiveBufferSize(128*1024); + dgSocket.setReuseAddress(false); + } + + /** + * bind to the default network interface on the machine + * + * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port. + * @throws SocketException + * @throws UnknownHostException + */ + public UDPMultiplexer(int localPort)throws SocketException, UnknownHostException{ + this(null,localPort); + } + + /** + * bind to an ephemeral port on the default network interface on the machine + * + * @throws SocketException + * @throws UnknownHostException + */ + public UDPMultiplexer()throws SocketException, UnknownHostException{ + this(null,0); + } + + /** + * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>, + * a new connection can be handed off to an application. The application needs to + * call #accept() to get the socket + * @param serverSocketModeEnabled + */ + public void start(boolean serverSocketModeEnabled){ + serverSocketMode=serverSocketModeEnabled; + //start receive thread + Runnable receive=new Runnable(){ + public void run(){ + try{ + doReceive(); + }catch(Exception ex){ + logger.log(Level.WARNING,"",ex); + } + } + }; + Thread t=UDTThreadFactory.get().newThread(receive); + t.setName("UDPEndpoint-"+t.getName()); + t.setDaemon(true); + t.start(); + logger.info("UDTEndpoint started."); + } + + public void start(){ + start(false); + } + + public void stop(){ + stopped=true; + dgSocket.close(); + } + + /** + * Provides assistance to a socket to determine a random socket id, + * every caller receives a unique value. This value is unique at the + * time of calling, however it may not be at registration time. + * + * This socketID has not been registered, all socket ID's must be + * registered or connection will fail. + * @return + */ + public int getUniqueSocketID(){ + Integer socketID = nextSocketID.getAndIncrement(); + try{ + readSocketLock.lock(); + while (registeredSockets.contains(socketID)){ + socketID = nextSocketID.getAndIncrement(); + } + return socketID; // should we register it? + } finally { + readSocketLock.unlock(); + } + } + + void registerSocketID(int socketID, UDTSocket socket) throws SocketException { + if (!equals(socket.getEndpoint())) throw new SocketException ( + "Socket doesn't originate for this endpoint: " + + socket.toString()); + try { + writeSocketLock.lock(); + if (registeredSockets.contains(socketID)){ + throw new SocketException("Already registered, Socket ID: " +socketID); + } + registeredSockets.add(socketID); + }finally{ + writeSocketLock.unlock(); + } + } + + /** + * @return the port which this client is bound to + */ + public int getLocalPort() { + return this.dgSocket.getLocalPort(); + } + /** + * @return Gets the local address to which the socket is bound + */ + public InetAddress getLocalAddress(){ + return this.dgSocket.getLocalAddress(); + } + + DatagramSocket getSocket(){ + return dgSocket; + } + + UDTPacket getLastPacket(){ + return lastPacket; + } + + public void addSession(Integer destinationID,UDTSession session){ + logger.log(Level.INFO, "Storing session <{0}>", destinationID); + sessions.put(destinationID, session); + } + + public UDTSession getSession(Long destinationID){ + return sessions.get(destinationID); + } + + public Collection<UDTSession> getSessions(){ + return sessions.values(); + } + + /** + * wait the given time for a new connection + * @param timeout - the time to wait + * @param unit - the {@link TimeUnit} + * @param socketID - the socket id. + * @return a new {@link UDTSession} + * @throws InterruptedException + */ + protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{ + //return sessionHandoff.poll(timeout, unit); + BlockingQueue<UDTSession> session = handoff.get(socketID); + try { + if (session == null){ + session = queuePool.get(); + if (session == null) { + session = new ArrayBlockingQueue<UDTSession>(1); + } + BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session); + if (existed != null){ + session = existed; + } + } + return session.poll(timeout, unit); + } finally { + boolean removed = handoff.remove(socketID, session); + if (removed){ + session.clear(); + queuePool.accept(session); + } + } + } + + + final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE); + + /** + * single receive, run in the receiverThread, see {@link #start()} + * <ul> + * <li>Receives UDP packets from the network</li> + * <li>Converts them to UDT packets</li> + * <li>dispatches the UDT packets according to their destination ID.</li> + * </ul> + * @throws IOException + */ + private long lastDestID=-1; + private UDTSession lastSession; + + private int n=0; + + private final Object lock=new Object(); + + protected void doReceive()throws IOException{ + while(!stopped){ + try{ + //will block until a packet is received or timeout has expired + dgSocket.receive(dp); + UDTSocketAddress peer= null; + int l=dp.getLength(); + UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); + lastPacket=packet; + //handle connection handshake + if(packet.isConnectionHandshake()){ + synchronized(lock){ + Long id=Long.valueOf(packet.getDestinationID()); + UDTSession session=sessions.get(id); + if(session==null){ // What about DOS? + session=new ServerSession(dp,this); + addSession(session.getSocketID(),session); + //TODO need to check peer to avoid duplicate server session + if(serverSocketMode){ + logger.fine("Pooling new request."); +// sessionHandoff.put(session); // blocking method, what about offer? + BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID()); + if (queue != null){ + boolean success = queue.offer(session); + if (success){ + logger.fine("Request taken for processing."); + } else { + logger.fine("Request discarded, queue full."); + } + } else { + logger.fine("No ServerSocket listening at socketID: " + + session.getSocketID() + + "to answer request"); + } + } + } + peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); + peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(), + ((ConnectionHandshake)packet).getSocketID()); + session.received(packet,peer); + } + } + else{ + //dispatch to existing session + long dest=packet.getDestinationID(); + UDTSession session; + if(dest==lastDestID){ + session=lastSession; + } + else{ + session=sessions.get(dest); + lastSession=session; + lastDestID=dest; + } + if(session==null){ + n++; + if(n%100==1){ + logger.warning("Unknown session <"+dest + +"> requested from <"+peer+"> packet type " + +packet.getClass().getName()); + } + } + else{ + session.received(packet,peer); + } + } + }catch(SocketException ex){ + logger.log(Level.INFO, "SocketException: "+ex.getMessage()); + }catch(SocketTimeoutException ste){ + //can safely ignore... we will retry until the endpoint is stopped + }catch(Exception ex){ + logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); + } + } + } + + protected void doSend(UDTPacket packet)throws IOException{ + byte[]data=packet.getEncoded(); + DatagramPacket dgp = packet.getSession().getDatagram(); + dgp.setData(data); + dgSocket.send(dgp); + } + + public String toString(){ + return "UDPEndpoint port="+port; + } + + public void sendRaw(DatagramPacket p)throws IOException{ + dgSocket.send(p); + } +} Modified: udt-java/skunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 06:54:24 UTC (rev 61) @@ -48,23 +48,23 @@ public class UDTClient { private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private final UDPEndPoint clientEndpoint; + private final UDPMultiplexer clientEndpoint; private ClientSession clientSession; public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint= UDPEndPoint.get(address,localport); + clientEndpoint= UDPMultiplexer.get(address,localport); logger.info("Created client endpoint on port "+localport); } public UDTClient(InetAddress address)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint= UDPEndPoint.get(address, 0); + clientEndpoint= UDPMultiplexer.get(address, 0); logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort()); } - public UDTClient(UDPEndPoint endpoint)throws SocketException, UnknownHostException{ + public UDTClient(UDPMultiplexer endpoint)throws SocketException, UnknownHostException{ clientEndpoint=endpoint; } @@ -153,7 +153,7 @@ return clientSession.getSocket().getOutputStream(); } - public UDPEndPoint getEndpoint()throws IOException{ + public UDPMultiplexer getEndpoint()throws IOException{ return clientEndpoint; } Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 06:54:24 UTC (rev 61) @@ -175,7 +175,7 @@ statistics.setSendPeriod(packetSendingPeriod); } - private final long PS=UDPEndPoint.DATAGRAM_SIZE; + private final long PS=UDPMultiplexer.DATAGRAM_SIZE; private final double BetaDivPS=0.0000015/PS; //see spec page 16 @@ -184,7 +184,7 @@ double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod; if(remaining<=0){ - return 1.0/UDPEndPoint.DATAGRAM_SIZE; + return 1.0/UDPMultiplexer.DATAGRAM_SIZE; } else{ double exp=Math.ceil(Math.log10(remaining*PS*8)); Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 06:54:24 UTC (rev 61) @@ -67,7 +67,7 @@ private static final Logger logger=Logger.getLogger(UDTReceiver.class.getName()); - private final UDPEndPoint endpoint; + private final UDPMultiplexer endpoint; private final UDTSession session; @@ -159,7 +159,7 @@ * create a receiver with a valid {@link UDTSession} * @param session */ - public UDTReceiver(UDTSession session,UDPEndPoint endpoint){ + public UDTReceiver(UDTSession session,UDPMultiplexer endpoint){ this.endpoint = endpoint; this.session=session; this.sessionUpSince=System.currentTimeMillis(); Modified: udt-java/skunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 06:54:24 UTC (rev 61) @@ -67,7 +67,7 @@ private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private final UDPEndPoint endpoint; + private final UDPMultiplexer endpoint; private final UDTSession session; private final UDTStatistics statistics; @@ -117,7 +117,7 @@ private final boolean storeStatistics; private final int chunksize; - public UDTSender(UDTSession session,UDPEndPoint endpoint){ + public UDTSender(UDTSession session,UDPMultiplexer endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 06:54:24 UTC (rev 61) @@ -48,7 +48,7 @@ public class UDTServerSocket extends ServerSocket { private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private volatile UDPEndPoint endpoint; + private volatile UDPMultiplexer endpoint; private volatile InetAddress localAdd; private volatile int locPort; private volatile SocketAddress localSocketAddress; @@ -64,7 +64,7 @@ */ public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{ super(); - endpoint= UDPEndPoint.get(localAddress,port); + endpoint= UDPMultiplexer.get(localAddress,port); localAdd = localAddress; locPort = port; bound = true; @@ -105,7 +105,7 @@ throw new IOException("UDTSession was null"); } - public UDPEndPoint getEndpoint(){ + public UDPMultiplexer getEndpoint(){ return endpoint; } Modified: udt-java/skunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 06:54:24 UTC (rev 61) @@ -91,7 +91,7 @@ protected int localPort; - public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE; + public static final int DEFAULT_DATAGRAM_SIZE=UDPMultiplexer.DATAGRAM_SIZE; /** * key for a system property defining the CC class to be used Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 06:54:24 UTC (rev 61) @@ -62,7 +62,7 @@ = new ArrayList<UDTSocketAddress>(120); //endpoint - private volatile UDPEndPoint endpoint; + private volatile UDPMultiplexer endpoint; private volatile boolean active; private volatile boolean connected; @@ -90,7 +90,7 @@ * @param endpoint * @throws SocketException,UnknownHostException */ - UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ + UDTSocket(UDPMultiplexer endpoint, UDTSession session)throws SocketException,UnknownHostException{ super(); this.endpoint=endpoint; this.session=session; @@ -105,7 +105,7 @@ public UDTSocket(InetAddress host, int port ) throws SocketException, UnknownHostException{ super(); - this.endpoint = UDPEndPoint.get(host, port); + this.endpoint = UDPMultiplexer.get(host, port); this.session = null; this.receiver = null; this.sender = null; @@ -193,7 +193,7 @@ if (boundSockets.contains(bindpoint)) throw new IOException("A socket is already bound to this address"); } - endpoint = UDPEndPoint.get(bindpoint); + endpoint = UDPMultiplexer.get(bindpoint); if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint"); bound = true; } @@ -517,7 +517,7 @@ return active; } - public UDPEndPoint getEndpoint() { + public UDPMultiplexer getEndpoint() { return endpoint; } Modified: udt-java/skunk/src/main/java/udt/util/Util.java =================================================================== --- udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 04:24:00 UTC (rev 60) +++ udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 06:54:24 UTC (rev 61) @@ -40,7 +40,7 @@ import java.net.InetAddress; import java.security.MessageDigest; -import udt.UDPEndPoint; +import udt.UDPMultiplexer; /** * helper methods @@ -150,7 +150,7 @@ * @return the local port that can now be accessed by the client * @throws IOException */ - public static void doHolePunch(UDPEndPoint endpoint,InetAddress client, int clientPort)throws IOException{ + public static void doHolePunch(UDPMultiplexer endpoint,InetAddress client, int clientPort)throws IOException{ DatagramPacket p=new DatagramPacket(new byte[1],1); p.setAddress(client); p.setPort(clientPort); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |