[Udt-java-commits] SF.net SVN: udt-java:[61] udt-java/skunk/src/main/java/udt
Status: Alpha
Brought to you by:
bschuller
|
From: <pe...@us...> - 2011-08-05 06:54:32
|
Revision: 61
http://udt-java.svn.sourceforge.net/udt-java/?rev=61&view=rev
Author: pete_
Date: 2011-08-05 06:54:24 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Renamed UDPEndPoint to UDPMultiplexer, this will make the implementation easier to understand for new comers who've read UDTv4: Improvements in Performance and Usability.
Modified Paths:
--------------
udt-java/skunk/src/main/java/udt/ClientSession.java
udt-java/skunk/src/main/java/udt/ServerSession.java
udt-java/skunk/src/main/java/udt/UDTClient.java
udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
udt-java/skunk/src/main/java/udt/UDTReceiver.java
udt-java/skunk/src/main/java/udt/UDTSender.java
udt-java/skunk/src/main/java/udt/UDTServerSocket.java
udt-java/skunk/src/main/java/udt/UDTSession.java
udt-java/skunk/src/main/java/udt/UDTSocket.java
udt-java/skunk/src/main/java/udt/util/Util.java
Added Paths:
-----------
udt-java/skunk/src/main/java/udt/UDPMultiplexer.java
Removed Paths:
-------------
udt-java/skunk/src/main/java/udt/UDPEndPoint.java
Modified: udt-java/skunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -50,9 +50,9 @@
private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
- private UDPEndPoint endPoint;
+ private UDPMultiplexer endPoint;
- public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{
+ public ClientSession(UDPMultiplexer endPoint, UDTSocketAddress dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
logger.info("Created "+toString());
Modified: udt-java/skunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -51,12 +51,12 @@
private static final Logger logger=Logger.getLogger(ServerSession.class.getName());
- private final UDPEndPoint endPoint;
+ private final UDPMultiplexer endPoint;
//last received packet (for testing purposes)
private UDTPacket lastPacket;
- public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
+ public ServerSession(DatagramPacket dp, UDPMultiplexer endPoint)throws SocketException,UnknownHostException{
super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0));
this.endPoint=endPoint;
logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
Deleted: udt-java/skunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -1,540 +0,0 @@
-/*********************************************************************************
- * Copyright (c) 2010 Forschungszentrum Juelich GmbH
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * (1) Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the disclaimer at the end. Redistributions in
- * binary form must reproduce the above copyright notice, this list of
- * conditions and the following disclaimer in the documentation and/or other
- * materials provided with the distribution.
- *
- * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * DISCLAIMER
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *********************************************************************************/
-
-package udt;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.WeakHashMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import udt.packets.ConnectionHandshake;
-import udt.packets.UDTSocketAddress;
-import udt.packets.PacketFactory;
-import udt.util.ObjectPool;
-import udt.util.UDTThreadFactory;
-
-/**
- * the UDPEndpoint takes care of sending and receiving UDP network packets,
- * dispatching them to the correct {@link UDTSession}
- */
-public class UDPEndPoint {
-
- //class fields
- private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
- public static final int DATAGRAM_SIZE=1400;
-
-
- //class methods
- private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints
- = new WeakHashMap<SocketAddress, UDPEndPoint>();
-
- public static UDPEndPoint get(DatagramSocket socket){
- SocketAddress localInetSocketAddress = null;
- UDPEndPoint result = null;
- if ( socket.isBound()){
- SocketAddress sa = socket.getLocalSocketAddress();
- if ( sa instanceof InetSocketAddress ){
- localInetSocketAddress = (InetSocketAddress) sa;
- } else {
- // Must be a special DatagramSocket impl or extended.
- localInetSocketAddress =
- new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
- }
- synchronized (localEndpoints){
- result = localEndpoints.get(localInetSocketAddress);
- }
- }
- if (result != null) return result;
- try {
- result = new UDPEndPoint(socket);
- if (localInetSocketAddress == null){
- // The DatagramSocket was unbound, it should be bound now.
- localInetSocketAddress =
- new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
- }
- } catch (SocketException ex) {
- Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex);
- }
- if (result != null){
- synchronized (localEndpoints){
- UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
- if (exists != null && exists.getSocket().equals(socket)) result = exists;
- // Only cache if a record doesn't already exist.
- else if (exists == null) localEndpoints.put(localInetSocketAddress, result);
- }
- }
- return result; // may be null.
- }
-
-
-
- public static UDPEndPoint get(InetAddress localAddress, int localPort){
- InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort);
- return get(localInetSocketAddress);
- }
-
- public static UDPEndPoint get(SocketAddress localSocketAddress){
- InetSocketAddress localInetSocketAddress = null;
- if (localSocketAddress instanceof InetSocketAddress){
- localInetSocketAddress = (InetSocketAddress) localSocketAddress;
- } else if (localSocketAddress instanceof UDTSocketAddress){
- UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress;
- localInetSocketAddress =
- new InetSocketAddress(udtSA.getAddress(), udtSA.getPort());
- }
- if (localInetSocketAddress == null) return null;
- UDPEndPoint result = null;
- synchronized (localEndpoints){
- result = localEndpoints.get(localInetSocketAddress);
- }
- if (result != null) return result;
- try {
- result = new UDPEndPoint(localInetSocketAddress);
- if (localInetSocketAddress.getPort() == 0 ||
- localInetSocketAddress.getAddress().isAnyLocalAddress()){
- // ephemeral port or wildcard address, bind operation is complete.
- localInetSocketAddress =
- new InetSocketAddress(result.getLocalAddress(), result.getLocalPort());
- }
- } catch (SocketException ex) {
- logger.log(Level.SEVERE, null, ex);
- } catch (UnknownHostException ex) {
- logger.log(Level.SEVERE, null, ex);
- }
- if (result != null){
- synchronized (localEndpoints){
- UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
- if (exists != null) result = exists;
- else localEndpoints.put(localInetSocketAddress, result);
- }
- }
- return result; // may be null.
- }
-
- /**
- * Allows a custom endpoint to be added to the pool.
- * @param endpoint
- */
- public static void put(UDPEndPoint endpoint){
- SocketAddress local = endpoint.getSocket().getLocalSocketAddress();
- synchronized (localEndpoints){
- localEndpoints.put(local, endpoint);
- }
- }
-
- //object fields
- private final int port;
-
- private final DatagramSocket dgSocket;
-
- //active sessions keyed by socket ID
- private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>();
-
- //last received packet
- private UDTPacket lastPacket;
-
- //if the endpoint is configured for a server socket,
- //this queue is used to handoff new UDTSessions to the application
- private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
-
- private final ObjectPool<BlockingQueue<UDTSession>> queuePool
- = new ObjectPool<BlockingQueue<UDTSession>>(20);
-
- private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff
- = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120);
-
- // registered sockets
- private final Set<Integer> registeredSockets = new HashSet<Integer>(120);
- // registered sockets lock
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock readSocketLock = rwl.readLock();
- private final Lock writeSocketLock = rwl.writeLock();
-
-
- private boolean serverSocketMode=false;
-
- //has the endpoint been stopped?
- private volatile boolean stopped=false;
-
- private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
-
-
- /**
- * create an endpoint on the given socket
- *
- * @param socket - a UDP datagram socket
- * @throws SocketException
- */
- protected UDPEndPoint(DatagramSocket socket) throws SocketException{
- this.dgSocket=socket;
- if (!socket.isBound()){
- socket.bind(null);
- }
- port=dgSocket.getLocalPort();
- }
-
- /**
- * bind to any local port on the given host address
- * @param localAddress
- * @throws SocketException
- * @throws UnknownHostException
- */
- private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{
- this(localAddress,0);
- }
-
- /**
- * Bind to the given address and port
- * @param localAddress
- * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
- * @throws SocketException
- * @throws UnknownHostException
- */
- private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
- if(localAddress==null){
- dgSocket=new DatagramSocket(localPort, localAddress);
- }else{
- dgSocket=new DatagramSocket(localPort);
- }
- if(localPort>0)this.port = localPort;
- else port=dgSocket.getLocalPort();
-
- configureSocket();
- }
-
- private UDPEndPoint (InetSocketAddress localSocketAddress)
- throws SocketException, UnknownHostException {
- dgSocket = new DatagramSocket(localSocketAddress);
- port = dgSocket.getLocalPort();
- configureSocket();
- }
-
- protected void configureSocket()throws SocketException{
- //set a time out to avoid blocking in doReceive()
- dgSocket.setSoTimeout(100000);
- //buffer size
- dgSocket.setReceiveBufferSize(128*1024);
- dgSocket.setReuseAddress(false);
- }
-
- /**
- * bind to the default network interface on the machine
- *
- * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
- * @throws SocketException
- * @throws UnknownHostException
- */
- public UDPEndPoint(int localPort)throws SocketException, UnknownHostException{
- this(null,localPort);
- }
-
- /**
- * bind to an ephemeral port on the default network interface on the machine
- *
- * @throws SocketException
- * @throws UnknownHostException
- */
- public UDPEndPoint()throws SocketException, UnknownHostException{
- this(null,0);
- }
-
- /**
- * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>,
- * a new connection can be handed off to an application. The application needs to
- * call #accept() to get the socket
- * @param serverSocketModeEnabled
- */
- public void start(boolean serverSocketModeEnabled){
- serverSocketMode=serverSocketModeEnabled;
- //start receive thread
- Runnable receive=new Runnable(){
- public void run(){
- try{
- doReceive();
- }catch(Exception ex){
- logger.log(Level.WARNING,"",ex);
- }
- }
- };
- Thread t=UDTThreadFactory.get().newThread(receive);
- t.setName("UDPEndpoint-"+t.getName());
- t.setDaemon(true);
- t.start();
- logger.info("UDTEndpoint started.");
- }
-
- public void start(){
- start(false);
- }
-
- public void stop(){
- stopped=true;
- dgSocket.close();
- }
-
- /**
- * Provides assistance to a socket to determine a random socket id,
- * every caller receives a unique value. This value is unique at the
- * time of calling, however it may not be at registration time.
- *
- * This socketID has not been registered, all socket ID's must be
- * registered or connection will fail.
- * @return
- */
- public int getUniqueSocketID(){
- Integer socketID = nextSocketID.getAndIncrement();
- try{
- readSocketLock.lock();
- while (registeredSockets.contains(socketID)){
- socketID = nextSocketID.getAndIncrement();
- }
- return socketID; // should we register it?
- } finally {
- readSocketLock.unlock();
- }
- }
-
- void registerSocketID(int socketID, UDTSocket socket) throws SocketException {
- if (!equals(socket.getEndpoint())) throw new SocketException (
- "Socket doesn't originate for this endpoint: "
- + socket.toString());
- try {
- writeSocketLock.lock();
- if (registeredSockets.contains(socketID)){
- throw new SocketException("Already registered, Socket ID: " +socketID);
- }
- registeredSockets.add(socketID);
- }finally{
- writeSocketLock.unlock();
- }
- }
-
- /**
- * @return the port which this client is bound to
- */
- public int getLocalPort() {
- return this.dgSocket.getLocalPort();
- }
- /**
- * @return Gets the local address to which the socket is bound
- */
- public InetAddress getLocalAddress(){
- return this.dgSocket.getLocalAddress();
- }
-
- DatagramSocket getSocket(){
- return dgSocket;
- }
-
- UDTPacket getLastPacket(){
- return lastPacket;
- }
-
- public void addSession(Integer destinationID,UDTSession session){
- logger.log(Level.INFO, "Storing session <{0}>", destinationID);
- sessions.put(destinationID, session);
- }
-
- public UDTSession getSession(Long destinationID){
- return sessions.get(destinationID);
- }
-
- public Collection<UDTSession> getSessions(){
- return sessions.values();
- }
-
- /**
- * wait the given time for a new connection
- * @param timeout - the time to wait
- * @param unit - the {@link TimeUnit}
- * @param socketID - the socket id.
- * @return a new {@link UDTSession}
- * @throws InterruptedException
- */
- protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{
- //return sessionHandoff.poll(timeout, unit);
- BlockingQueue<UDTSession> session = handoff.get(socketID);
- try {
- if (session == null){
- session = queuePool.get();
- if (session == null) {
- session = new ArrayBlockingQueue<UDTSession>(1);
- }
- BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session);
- if (existed != null){
- session = existed;
- }
- }
- return session.poll(timeout, unit);
- } finally {
- boolean removed = handoff.remove(socketID, session);
- if (removed){
- session.clear();
- queuePool.accept(session);
- }
- }
- }
-
-
- final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE);
-
- /**
- * single receive, run in the receiverThread, see {@link #start()}
- * <ul>
- * <li>Receives UDP packets from the network</li>
- * <li>Converts them to UDT packets</li>
- * <li>dispatches the UDT packets according to their destination ID.</li>
- * </ul>
- * @throws IOException
- */
- private long lastDestID=-1;
- private UDTSession lastSession;
-
- private int n=0;
-
- private final Object lock=new Object();
-
- protected void doReceive()throws IOException{
- while(!stopped){
- try{
- //will block until a packet is received or timeout has expired
- dgSocket.receive(dp);
- UDTSocketAddress peer= null;
- int l=dp.getLength();
- UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
- lastPacket=packet;
- //handle connection handshake
- if(packet.isConnectionHandshake()){
- synchronized(lock){
- Long id=Long.valueOf(packet.getDestinationID());
- UDTSession session=sessions.get(id);
- if(session==null){ // What about DOS?
- session=new ServerSession(dp,this);
- addSession(session.getSocketID(),session);
- //TODO need to check peer to avoid duplicate server session
- if(serverSocketMode){
- logger.fine("Pooling new request.");
-// sessionHandoff.put(session); // blocking method, what about offer?
- BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID());
- if (queue != null){
- boolean success = queue.offer(session);
- if (success){
- logger.fine("Request taken for processing.");
- } else {
- logger.fine("Request discarded, queue full.");
- }
- } else {
- logger.fine("No ServerSocket listening at socketID: "
- + session.getSocketID()
- + "to answer request");
- }
- }
- }
- peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
- peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(),
- ((ConnectionHandshake)packet).getSocketID());
- session.received(packet,peer);
- }
- }
- else{
- //dispatch to existing session
- long dest=packet.getDestinationID();
- UDTSession session;
- if(dest==lastDestID){
- session=lastSession;
- }
- else{
- session=sessions.get(dest);
- lastSession=session;
- lastDestID=dest;
- }
- if(session==null){
- n++;
- if(n%100==1){
- logger.warning("Unknown session <"+dest
- +"> requested from <"+peer+"> packet type "
- +packet.getClass().getName());
- }
- }
- else{
- session.received(packet,peer);
- }
- }
- }catch(SocketException ex){
- logger.log(Level.INFO, "SocketException: "+ex.getMessage());
- }catch(SocketTimeoutException ste){
- //can safely ignore... we will retry until the endpoint is stopped
- }catch(Exception ex){
- logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
- }
- }
- }
-
- protected void doSend(UDTPacket packet)throws IOException{
- byte[]data=packet.getEncoded();
- DatagramPacket dgp = packet.getSession().getDatagram();
- dgp.setData(data);
- dgSocket.send(dgp);
- }
-
- public String toString(){
- return "UDPEndpoint port="+port;
- }
-
- public void sendRaw(DatagramPacket p)throws IOException{
- dgSocket.send(p);
- }
-}
Copied: udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (from rev 60, udt-java/skunk/src/main/java/udt/UDPEndPoint.java)
===================================================================
--- udt-java/skunk/src/main/java/udt/UDPMultiplexer.java (rev 0)
+++ udt-java/skunk/src/main/java/udt/UDPMultiplexer.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -0,0 +1,540 @@
+/*********************************************************************************
+ * Copyright (c) 2010 Forschungszentrum Juelich GmbH
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * (1) Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the disclaimer at the end. Redistributions in
+ * binary form must reproduce the above copyright notice, this list of
+ * conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution.
+ *
+ * (2) Neither the name of Forschungszentrum Juelich GmbH nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * DISCLAIMER
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *********************************************************************************/
+
+package udt;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import udt.packets.ConnectionHandshake;
+import udt.packets.UDTSocketAddress;
+import udt.packets.PacketFactory;
+import udt.util.ObjectPool;
+import udt.util.UDTThreadFactory;
+
+/**
+ * the UDPMultiplexer takes care of sending and receiving UDP network packets,
+ * dispatching them to the correct {@link UDTSession}
+ */
+public class UDPMultiplexer {
+
+ //class fields
+ private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
+ public static final int DATAGRAM_SIZE=1400;
+
+
+ //class methods
+ private static final WeakHashMap<SocketAddress, UDPMultiplexer> localEndpoints
+ = new WeakHashMap<SocketAddress, UDPMultiplexer>();
+
+ public static UDPMultiplexer get(DatagramSocket socket){
+ SocketAddress localInetSocketAddress = null;
+ UDPMultiplexer result = null;
+ if ( socket.isBound()){
+ SocketAddress sa = socket.getLocalSocketAddress();
+ if ( sa instanceof InetSocketAddress ){
+ localInetSocketAddress = (InetSocketAddress) sa;
+ } else {
+ // Must be a special DatagramSocket impl or extended.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPMultiplexer(socket);
+ if (localInetSocketAddress == null){
+ // The DatagramSocket was unbound, it should be bound now.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ Logger.getLogger(UDPMultiplexer.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null && exists.getSocket().equals(socket)) result = exists;
+ // Only cache if a record doesn't already exist.
+ else if (exists == null) localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+
+
+ public static UDPMultiplexer get(InetAddress localAddress, int localPort){
+ InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort);
+ return get(localInetSocketAddress);
+ }
+
+ public static UDPMultiplexer get(SocketAddress localSocketAddress){
+ InetSocketAddress localInetSocketAddress = null;
+ if (localSocketAddress instanceof InetSocketAddress){
+ localInetSocketAddress = (InetSocketAddress) localSocketAddress;
+ } else if (localSocketAddress instanceof UDTSocketAddress){
+ UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress;
+ localInetSocketAddress =
+ new InetSocketAddress(udtSA.getAddress(), udtSA.getPort());
+ }
+ if (localInetSocketAddress == null) return null;
+ UDPMultiplexer result = null;
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPMultiplexer(localInetSocketAddress);
+ if (localInetSocketAddress.getPort() == 0 ||
+ localInetSocketAddress.getAddress().isAnyLocalAddress()){
+ // ephemeral port or wildcard address, bind operation is complete.
+ localInetSocketAddress =
+ new InetSocketAddress(result.getLocalAddress(), result.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ } catch (UnknownHostException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPMultiplexer exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null) result = exists;
+ else localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+ /**
+ * Allows a custom endpoint to be added to the pool.
+ * @param endpoint
+ */
+ public static void put(UDPMultiplexer endpoint){
+ SocketAddress local = endpoint.getSocket().getLocalSocketAddress();
+ synchronized (localEndpoints){
+ localEndpoints.put(local, endpoint);
+ }
+ }
+
+ //object fields
+ private final int port;
+
+ private final DatagramSocket dgSocket;
+
+ //active sessions keyed by socket ID
+ private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>();
+
+ //last received packet
+ private UDTPacket lastPacket;
+
+ //if the endpoint is configured for a server socket,
+ //this queue is used to handoff new UDTSessions to the application
+ private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
+
+ private final ObjectPool<BlockingQueue<UDTSession>> queuePool
+ = new ObjectPool<BlockingQueue<UDTSession>>(20);
+
+ private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff
+ = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120);
+
+ // registered sockets
+ private final Set<Integer> registeredSockets = new HashSet<Integer>(120);
+ // registered sockets lock
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ private final Lock readSocketLock = rwl.readLock();
+ private final Lock writeSocketLock = rwl.writeLock();
+
+
+ private boolean serverSocketMode=false;
+
+ //has the endpoint been stopped?
+ private volatile boolean stopped=false;
+
+ private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
+
+
+ /**
+ * create an endpoint on the given socket
+ *
+ * @param socket - a UDP datagram socket
+ * @throws SocketException
+ */
+ protected UDPMultiplexer(DatagramSocket socket) throws SocketException{
+ this.dgSocket=socket;
+ if (!socket.isBound()){
+ socket.bind(null);
+ }
+ port=dgSocket.getLocalPort();
+ }
+
+ /**
+ * bind to any local port on the given host address
+ * @param localAddress
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ private UDPMultiplexer(InetAddress localAddress)throws SocketException, UnknownHostException{
+ this(localAddress,0);
+ }
+
+ /**
+ * Bind to the given address and port
+ * @param localAddress
+ * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ private UDPMultiplexer(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
+ if(localAddress==null){
+ dgSocket=new DatagramSocket(localPort, localAddress);
+ }else{
+ dgSocket=new DatagramSocket(localPort);
+ }
+ if(localPort>0)this.port = localPort;
+ else port=dgSocket.getLocalPort();
+
+ configureSocket();
+ }
+
+ private UDPMultiplexer (InetSocketAddress localSocketAddress)
+ throws SocketException, UnknownHostException {
+ dgSocket = new DatagramSocket(localSocketAddress);
+ port = dgSocket.getLocalPort();
+ configureSocket();
+ }
+
+ protected void configureSocket()throws SocketException{
+ //set a time out to avoid blocking in doReceive()
+ dgSocket.setSoTimeout(100000);
+ //buffer size
+ dgSocket.setReceiveBufferSize(128*1024);
+ dgSocket.setReuseAddress(false);
+ }
+
+ /**
+ * bind to the default network interface on the machine
+ *
+ * @param localPort - the port to bind to. If the port is zero, the system will pick an ephemeral port.
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ public UDPMultiplexer(int localPort)throws SocketException, UnknownHostException{
+ this(null,localPort);
+ }
+
+ /**
+ * bind to an ephemeral port on the default network interface on the machine
+ *
+ * @throws SocketException
+ * @throws UnknownHostException
+ */
+ public UDPMultiplexer()throws SocketException, UnknownHostException{
+ this(null,0);
+ }
+
+ /**
+ * start the endpoint. If the serverSocketModeEnabled flag is <code>true</code>,
+ * a new connection can be handed off to an application. The application needs to
+ * call #accept() to get the socket
+ * @param serverSocketModeEnabled
+ */
+ public void start(boolean serverSocketModeEnabled){
+ serverSocketMode=serverSocketModeEnabled;
+ //start receive thread
+ Runnable receive=new Runnable(){
+ public void run(){
+ try{
+ doReceive();
+ }catch(Exception ex){
+ logger.log(Level.WARNING,"",ex);
+ }
+ }
+ };
+ Thread t=UDTThreadFactory.get().newThread(receive);
+ t.setName("UDPEndpoint-"+t.getName());
+ t.setDaemon(true);
+ t.start();
+ logger.info("UDTEndpoint started.");
+ }
+
+ public void start(){
+ start(false);
+ }
+
+ public void stop(){
+ stopped=true;
+ dgSocket.close();
+ }
+
+ /**
+ * Provides assistance to a socket to determine a random socket id,
+ * every caller receives a unique value. This value is unique at the
+ * time of calling, however it may not be at registration time.
+ *
+ * This socketID has not been registered, all socket ID's must be
+ * registered or connection will fail.
+ * @return
+ */
+ public int getUniqueSocketID(){
+ Integer socketID = nextSocketID.getAndIncrement();
+ try{
+ readSocketLock.lock();
+ while (registeredSockets.contains(socketID)){
+ socketID = nextSocketID.getAndIncrement();
+ }
+ return socketID; // should we register it?
+ } finally {
+ readSocketLock.unlock();
+ }
+ }
+
+ void registerSocketID(int socketID, UDTSocket socket) throws SocketException {
+ if (!equals(socket.getEndpoint())) throw new SocketException (
+ "Socket doesn't originate for this endpoint: "
+ + socket.toString());
+ try {
+ writeSocketLock.lock();
+ if (registeredSockets.contains(socketID)){
+ throw new SocketException("Already registered, Socket ID: " +socketID);
+ }
+ registeredSockets.add(socketID);
+ }finally{
+ writeSocketLock.unlock();
+ }
+ }
+
+ /**
+ * @return the port which this client is bound to
+ */
+ public int getLocalPort() {
+ return this.dgSocket.getLocalPort();
+ }
+ /**
+ * @return Gets the local address to which the socket is bound
+ */
+ public InetAddress getLocalAddress(){
+ return this.dgSocket.getLocalAddress();
+ }
+
+ DatagramSocket getSocket(){
+ return dgSocket;
+ }
+
+ UDTPacket getLastPacket(){
+ return lastPacket;
+ }
+
+ public void addSession(Integer destinationID,UDTSession session){
+ logger.log(Level.INFO, "Storing session <{0}>", destinationID);
+ sessions.put(destinationID, session);
+ }
+
+ public UDTSession getSession(Long destinationID){
+ return sessions.get(destinationID);
+ }
+
+ public Collection<UDTSession> getSessions(){
+ return sessions.values();
+ }
+
+ /**
+ * wait the given time for a new connection
+ * @param timeout - the time to wait
+ * @param unit - the {@link TimeUnit}
+ * @param socketID - the socket id.
+ * @return a new {@link UDTSession}
+ * @throws InterruptedException
+ */
+ protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{
+ //return sessionHandoff.poll(timeout, unit);
+ BlockingQueue<UDTSession> session = handoff.get(socketID);
+ try {
+ if (session == null){
+ session = queuePool.get();
+ if (session == null) {
+ session = new ArrayBlockingQueue<UDTSession>(1);
+ }
+ BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session);
+ if (existed != null){
+ session = existed;
+ }
+ }
+ return session.poll(timeout, unit);
+ } finally {
+ boolean removed = handoff.remove(socketID, session);
+ if (removed){
+ session.clear();
+ queuePool.accept(session);
+ }
+ }
+ }
+
+
+ final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE);
+
+ /**
+ * single receive, run in the receiverThread, see {@link #start()}
+ * <ul>
+ * <li>Receives UDP packets from the network</li>
+ * <li>Converts them to UDT packets</li>
+ * <li>dispatches the UDT packets according to their destination ID.</li>
+ * </ul>
+ * @throws IOException
+ */
+ private long lastDestID=-1;
+ private UDTSession lastSession;
+
+ private int n=0;
+
+ private final Object lock=new Object();
+
+ protected void doReceive()throws IOException{
+ while(!stopped){
+ try{
+ //will block until a packet is received or timeout has expired
+ dgSocket.receive(dp);
+ UDTSocketAddress peer= null;
+ int l=dp.getLength();
+ UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
+ lastPacket=packet;
+ //handle connection handshake
+ if(packet.isConnectionHandshake()){
+ synchronized(lock){
+ Long id=Long.valueOf(packet.getDestinationID());
+ UDTSession session=sessions.get(id);
+ if(session==null){ // What about DOS?
+ session=new ServerSession(dp,this);
+ addSession(session.getSocketID(),session);
+ //TODO need to check peer to avoid duplicate server session
+ if(serverSocketMode){
+ logger.fine("Pooling new request.");
+// sessionHandoff.put(session); // blocking method, what about offer?
+ BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID());
+ if (queue != null){
+ boolean success = queue.offer(session);
+ if (success){
+ logger.fine("Request taken for processing.");
+ } else {
+ logger.fine("Request discarded, queue full.");
+ }
+ } else {
+ logger.fine("No ServerSocket listening at socketID: "
+ + session.getSocketID()
+ + "to answer request");
+ }
+ }
+ }
+ peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
+ peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(),
+ ((ConnectionHandshake)packet).getSocketID());
+ session.received(packet,peer);
+ }
+ }
+ else{
+ //dispatch to existing session
+ long dest=packet.getDestinationID();
+ UDTSession session;
+ if(dest==lastDestID){
+ session=lastSession;
+ }
+ else{
+ session=sessions.get(dest);
+ lastSession=session;
+ lastDestID=dest;
+ }
+ if(session==null){
+ n++;
+ if(n%100==1){
+ logger.warning("Unknown session <"+dest
+ +"> requested from <"+peer+"> packet type "
+ +packet.getClass().getName());
+ }
+ }
+ else{
+ session.received(packet,peer);
+ }
+ }
+ }catch(SocketException ex){
+ logger.log(Level.INFO, "SocketException: "+ex.getMessage());
+ }catch(SocketTimeoutException ste){
+ //can safely ignore... we will retry until the endpoint is stopped
+ }catch(Exception ex){
+ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
+ }
+ }
+ }
+
+ protected void doSend(UDTPacket packet)throws IOException{
+ byte[]data=packet.getEncoded();
+ DatagramPacket dgp = packet.getSession().getDatagram();
+ dgp.setData(data);
+ dgSocket.send(dgp);
+ }
+
+ public String toString(){
+ return "UDPEndpoint port="+port;
+ }
+
+ public void sendRaw(DatagramPacket p)throws IOException{
+ dgSocket.send(p);
+ }
+}
Modified: udt-java/skunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -48,23 +48,23 @@
public class UDTClient {
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private final UDPEndPoint clientEndpoint;
+ private final UDPMultiplexer clientEndpoint;
private ClientSession clientSession;
public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint= UDPEndPoint.get(address,localport);
+ clientEndpoint= UDPMultiplexer.get(address,localport);
logger.info("Created client endpoint on port "+localport);
}
public UDTClient(InetAddress address)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint= UDPEndPoint.get(address, 0);
+ clientEndpoint= UDPMultiplexer.get(address, 0);
logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort());
}
- public UDTClient(UDPEndPoint endpoint)throws SocketException, UnknownHostException{
+ public UDTClient(UDPMultiplexer endpoint)throws SocketException, UnknownHostException{
clientEndpoint=endpoint;
}
@@ -153,7 +153,7 @@
return clientSession.getSocket().getOutputStream();
}
- public UDPEndPoint getEndpoint()throws IOException{
+ public UDPMultiplexer getEndpoint()throws IOException{
return clientEndpoint;
}
Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -175,7 +175,7 @@
statistics.setSendPeriod(packetSendingPeriod);
}
- private final long PS=UDPEndPoint.DATAGRAM_SIZE;
+ private final long PS=UDPMultiplexer.DATAGRAM_SIZE;
private final double BetaDivPS=0.0000015/PS;
//see spec page 16
@@ -184,7 +184,7 @@
double remaining=estimatedLinkCapacity-1000000.0/packetSendingPeriod;
if(remaining<=0){
- return 1.0/UDPEndPoint.DATAGRAM_SIZE;
+ return 1.0/UDPMultiplexer.DATAGRAM_SIZE;
}
else{
double exp=Math.ceil(Math.log10(remaining*PS*8));
Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -67,7 +67,7 @@
private static final Logger logger=Logger.getLogger(UDTReceiver.class.getName());
- private final UDPEndPoint endpoint;
+ private final UDPMultiplexer endpoint;
private final UDTSession session;
@@ -159,7 +159,7 @@
* create a receiver with a valid {@link UDTSession}
* @param session
*/
- public UDTReceiver(UDTSession session,UDPEndPoint endpoint){
+ public UDTReceiver(UDTSession session,UDPMultiplexer endpoint){
this.endpoint = endpoint;
this.session=session;
this.sessionUpSince=System.currentTimeMillis();
Modified: udt-java/skunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -67,7 +67,7 @@
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private final UDPEndPoint endpoint;
+ private final UDPMultiplexer endpoint;
private final UDTSession session;
private final UDTStatistics statistics;
@@ -117,7 +117,7 @@
private final boolean storeStatistics;
private final int chunksize;
- public UDTSender(UDTSession session,UDPEndPoint endpoint){
+ public UDTSender(UDTSession session,UDPMultiplexer endpoint){
if(!session.isReady())throw new IllegalStateException("UDTSession is not ready.");
this.endpoint= endpoint;
this.session=session;
Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -48,7 +48,7 @@
public class UDTServerSocket extends ServerSocket {
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private volatile UDPEndPoint endpoint;
+ private volatile UDPMultiplexer endpoint;
private volatile InetAddress localAdd;
private volatile int locPort;
private volatile SocketAddress localSocketAddress;
@@ -64,7 +64,7 @@
*/
public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{
super();
- endpoint= UDPEndPoint.get(localAddress,port);
+ endpoint= UDPMultiplexer.get(localAddress,port);
localAdd = localAddress;
locPort = port;
bound = true;
@@ -105,7 +105,7 @@
throw new IOException("UDTSession was null");
}
- public UDPEndPoint getEndpoint(){
+ public UDPMultiplexer getEndpoint(){
return endpoint;
}
Modified: udt-java/skunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -91,7 +91,7 @@
protected int localPort;
- public static final int DEFAULT_DATAGRAM_SIZE=UDPEndPoint.DATAGRAM_SIZE;
+ public static final int DEFAULT_DATAGRAM_SIZE=UDPMultiplexer.DATAGRAM_SIZE;
/**
* key for a system property defining the CC class to be used
Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -62,7 +62,7 @@
= new ArrayList<UDTSocketAddress>(120);
//endpoint
- private volatile UDPEndPoint endpoint;
+ private volatile UDPMultiplexer endpoint;
private volatile boolean active;
private volatile boolean connected;
@@ -90,7 +90,7 @@
* @param endpoint
* @throws SocketException,UnknownHostException
*/
- UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
+ UDTSocket(UDPMultiplexer endpoint, UDTSession session)throws SocketException,UnknownHostException{
super();
this.endpoint=endpoint;
this.session=session;
@@ -105,7 +105,7 @@
public UDTSocket(InetAddress host, int port ) throws SocketException,
UnknownHostException{
super();
- this.endpoint = UDPEndPoint.get(host, port);
+ this.endpoint = UDPMultiplexer.get(host, port);
this.session = null;
this.receiver = null;
this.sender = null;
@@ -193,7 +193,7 @@
if (boundSockets.contains(bindpoint)) throw
new IOException("A socket is already bound to this address");
}
- endpoint = UDPEndPoint.get(bindpoint);
+ endpoint = UDPMultiplexer.get(bindpoint);
if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint");
bound = true;
}
@@ -517,7 +517,7 @@
return active;
}
- public UDPEndPoint getEndpoint() {
+ public UDPMultiplexer getEndpoint() {
return endpoint;
}
Modified: udt-java/skunk/src/main/java/udt/util/Util.java
===================================================================
--- udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 04:24:00 UTC (rev 60)
+++ udt-java/skunk/src/main/java/udt/util/Util.java 2011-08-05 06:54:24 UTC (rev 61)
@@ -40,7 +40,7 @@
import java.net.InetAddress;
import java.security.MessageDigest;
-import udt.UDPEndPoint;
+import udt.UDPMultiplexer;
/**
* helper methods
@@ -150,7 +150,7 @@
* @return the local port that can now be accessed by the client
* @throws IOException
*/
- public static void doHolePunch(UDPEndPoint endpoint,InetAddress client, int clientPort)throws IOException{
+ public static void doHolePunch(UDPMultiplexer endpoint,InetAddress client, int clientPort)throws IOException{
DatagramPacket p=new DatagramPacket(new byte[1],1);
p.setAddress(client);
p.setPort(clientPort);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|